diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1bcdc5f..4397e0c 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2260,7 +2260,6 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Whether to generate the splits locally or in the AM (tez only)"), HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits", true, "Whether to generate consistent split locations when generating splits in the AM"), - HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez/Spark (Hadoop 2 only)"), HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez/Spark (Hadoop 2 only)"), @@ -2473,7 +2472,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { new TimeValidator(TimeUnit.SECONDS), "How long to delay before cleaning up query files in LLAP (in seconds, for debugging).", "llap.file.cleanup.delay-seconds"), - LLAP_DAEMON_SERVICE_HOSTS("hive.llap.daemon.service.hosts", "", + LLAP_DAEMON_SERVICE_HOSTS("hive.llap.daemon.service.hosts", null, "Explicitly specified hosts to use for LLAP scheduling. Useful for testing. By default,\n" + "YARN registry is used.", "llap.daemon.service.hosts"), LLAP_DAEMON_SERVICE_REFRESH_INTERVAL("hive.llap.daemon.service.refresh.interval.sec", "60s", @@ -2540,6 +2539,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "llap.daemon.service.port"), LLAP_DAEMON_WEB_SSL("hive.llap.daemon.web.ssl", false, "Whether LLAP daemon web UI should use SSL.", "llap.daemon.service.ssl"), + LLAP_CLIENT_CONSISTENT_SPLITS("hive.llap.client.consistent.splits", + false, + "Whether to setup split locations to match nodes on which llap daemons are running," + + " instead of using the locations provided by the split itself"), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java index 388b5f3..be811eb 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.registry; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Set; @@ -30,6 +31,13 @@ public Map getAll(); /** + * Gets a list containing all the instances. This list has the same iteration order across + * different processes, assuming the list of registry entries is the same. + * @return + */ + public List getAllInstancesOrdered(); + + /** * Get an instance by worker identity. * * @param name diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index ef9de32..92044bb 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -17,8 +17,13 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; @@ -176,7 +181,8 @@ public String toString() { private final class FixedServiceInstanceSet implements ServiceInstanceSet { - private final Map instances = new HashMap(); + // LinkedHashMap have a repeatable iteration order. + private final Map instances = new LinkedHashMap<>(); public FixedServiceInstanceSet() { for (String host : hosts) { @@ -191,6 +197,19 @@ public FixedServiceInstanceSet() { } @Override + public List getAllInstancesOrdered() { + List list = new LinkedList<>(); + list.addAll(instances.values()); + Collections.sort(list, new Comparator() { + @Override + public int compare(ServiceInstance o1, ServiceInstance o2) { + return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity()); + } + }); + return list; + } + + @Override public ServiceInstance getInstance(String name) { return instances.get(name); } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 740f373..907faed 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -14,11 +14,13 @@ package org.apache.hadoop.hive.llap.registry.impl; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; import org.apache.hadoop.service.AbstractService; @@ -32,11 +34,45 @@ private ServiceRegistry registry = null; private final boolean isDaemon; + private static final Map yarnRegistries = new HashMap<>(); + public LlapRegistryService(boolean isDaemon) { super("LlapRegistryService"); this.isDaemon = isDaemon; } + /** + * Helper method to get a ServiceRegistry instance to read from the registry. + * This should not be used by LLAP daemons. + * + * @param conf {@link Configuration} instance which contains service registry information. + * @return + */ + public static synchronized LlapRegistryService getClient(Configuration conf) { + String hosts = HiveConf.getTrimmedVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS); + Preconditions.checkNotNull(hosts, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.toString() + " must be defined"); + LlapRegistryService registry; + if (hosts.startsWith("@")) { + // Caching instances only in case of the YARN registry. Each host based list will get it's own copy. + String name = hosts.substring(1); + if (yarnRegistries.containsKey(name)) { + registry = yarnRegistries.get(name); + } else { + registry = new LlapRegistryService(false); + registry.init(conf); + registry.start(); + yarnRegistries.put(name, registry); + } + } else { + registry = new LlapRegistryService(false); + registry.init(conf); + registry.start(); + } + LOG.info("Using LLAP registry (client) type: " + registry); + return registry; + } + + @Override public void serviceInit(Configuration conf) { String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java index fc2ebf2..efe31cc 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java @@ -20,15 +20,20 @@ import java.net.URISyntaxException; import java.net.URL; import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; @@ -269,16 +274,47 @@ public int getManagementPort() { // LinkedHashMap to retain iteration order. private final Map instances = new LinkedHashMap<>(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); @Override - public synchronized Map getAll() { + public Map getAll() { // Return a copy. Instances may be modified during a refresh. - return new LinkedHashMap<>(instances); + readLock.lock(); + try { + return new LinkedHashMap<>(instances); + } finally { + readLock.unlock(); + } } @Override - public synchronized ServiceInstance getInstance(String name) { - return instances.get(name); + public List getAllInstancesOrdered() { + List list = new LinkedList<>(); + readLock.lock(); + try { + list.addAll(instances.values()); + } finally { + readLock.unlock(); + } + Collections.sort(list, new Comparator() { + @Override + public int compare(ServiceInstance o1, ServiceInstance o2) { + return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity()); + } + }); + return list; + } + + @Override + public ServiceInstance getInstance(String name) { + readLock.lock(); + try { + return instances.get(name); + } finally { + readLock.unlock(); + } } @Override @@ -290,7 +326,8 @@ public void refresh() throws IOException { Map records = RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); // Synchronize after reading the service records from the external service (ZK) - synchronized (this) { + writeLock.lock(); + try { Set latestKeys = new HashSet(); LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this)); for (ServiceRecord rec : records.values()) { @@ -333,28 +370,34 @@ public void refresh() throws IOException { } else { this.instances.putAll(freshInstances); } + } finally { + writeLock.unlock(); } } @Override - public synchronized Set getByHost(String host) { + public Set getByHost(String host) { // TODO Maybe store this as a map which is populated during construction, to avoid walking // the map on each request. + readLock.lock(); Set byHost = new HashSet(); - - for (ServiceInstance i : instances.values()) { - if (host.equals(i.getHost())) { - // all hosts in instances should be alive in this impl - byHost.add(i); + try { + for (ServiceInstance i : instances.values()) { + if (host.equals(i.getHost())) { + // all hosts in instances should be alive in this impl + byHost.add(i); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Locality comparing " + host + " to " + i.getHost()); + } } if (LOG.isDebugEnabled()) { - LOG.debug("Locality comparing " + host + " to " + i.getHost()); + LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); } + return byHost; + } finally { + readLock.unlock(); } - if (LOG.isDebugEnabled()) { - LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); - } - return byHost; } } diff --git llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index d2180e5..4ab7b32 100644 --- llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -3245,6 +3245,16 @@ public Builder clearMergedInputDescriptor() { com.google.protobuf.ByteString getDagNameBytes(); + // optional int32 dag_id = 11; + /** + * optional int32 dag_id = 11; + */ + boolean hasDagId(); + /** + * optional int32 dag_id = 11; + */ + int getDagId(); + // optional string vertex_name = 3; /** * optional string vertex_name = 3; @@ -3441,13 +3451,13 @@ private FragmentSpecProto( break; } case 26: { - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000008; vertexName_ = input.readBytes(); break; } case 34: { org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto.Builder subBuilder = null; - if (((bitField0_ & 0x00000008) == 0x00000008)) { + if (((bitField0_ & 0x00000010) == 0x00000010)) { subBuilder = processorDescriptor_.toBuilder(); } processorDescriptor_ = input.readMessage(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto.PARSER, extensionRegistry); @@ -3455,48 +3465,53 @@ private FragmentSpecProto( subBuilder.mergeFrom(processorDescriptor_); processorDescriptor_ = subBuilder.buildPartial(); } - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000010; break; } case 42: { - if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) { + if (!((mutable_bitField0_ & 0x00000020) == 0x00000020)) { inputSpecs_ = new java.util.ArrayList(); - mutable_bitField0_ |= 0x00000010; + mutable_bitField0_ |= 0x00000020; } inputSpecs_.add(input.readMessage(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto.PARSER, extensionRegistry)); break; } case 50: { - if (!((mutable_bitField0_ & 0x00000020) == 0x00000020)) { + if (!((mutable_bitField0_ & 0x00000040) == 0x00000040)) { outputSpecs_ = new java.util.ArrayList(); - mutable_bitField0_ |= 0x00000020; + mutable_bitField0_ |= 0x00000040; } outputSpecs_.add(input.readMessage(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto.PARSER, extensionRegistry)); break; } case 58: { - if (!((mutable_bitField0_ & 0x00000040) == 0x00000040)) { + if (!((mutable_bitField0_ & 0x00000080) == 0x00000080)) { groupedInputSpecs_ = new java.util.ArrayList(); - mutable_bitField0_ |= 0x00000040; + mutable_bitField0_ |= 0x00000080; } groupedInputSpecs_.add(input.readMessage(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto.PARSER, extensionRegistry)); break; } case 64: { - bitField0_ |= 0x00000010; + bitField0_ |= 0x00000020; vertexParallelism_ = input.readInt32(); break; } case 72: { - bitField0_ |= 0x00000020; + bitField0_ |= 0x00000040; fragmentNumber_ = input.readInt32(); break; } case 80: { - bitField0_ |= 0x00000040; + bitField0_ |= 0x00000080; attemptNumber_ = input.readInt32(); break; } + case 88: { + bitField0_ |= 0x00000004; + dagId_ = input.readInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -3505,13 +3520,13 @@ private FragmentSpecProto( throw new com.google.protobuf.InvalidProtocolBufferException( e.getMessage()).setUnfinishedMessage(this); } finally { - if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) { + if (((mutable_bitField0_ & 0x00000020) == 0x00000020)) { inputSpecs_ = java.util.Collections.unmodifiableList(inputSpecs_); } - if (((mutable_bitField0_ & 0x00000020) == 0x00000020)) { + if (((mutable_bitField0_ & 0x00000040) == 0x00000040)) { outputSpecs_ = java.util.Collections.unmodifiableList(outputSpecs_); } - if (((mutable_bitField0_ & 0x00000040) == 0x00000040)) { + if (((mutable_bitField0_ & 0x00000080) == 0x00000080)) { groupedInputSpecs_ = java.util.Collections.unmodifiableList(groupedInputSpecs_); } this.unknownFields = unknownFields.build(); @@ -3632,6 +3647,22 @@ public boolean hasDagName() { } } + // optional int32 dag_id = 11; + public static final int DAG_ID_FIELD_NUMBER = 11; + private int dagId_; + /** + * optional int32 dag_id = 11; + */ + public boolean hasDagId() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int32 dag_id = 11; + */ + public int getDagId() { + return dagId_; + } + // optional string vertex_name = 3; public static final int VERTEX_NAME_FIELD_NUMBER = 3; private java.lang.Object vertexName_; @@ -3639,7 +3670,7 @@ public boolean hasDagName() { * optional string vertex_name = 3; */ public boolean hasVertexName() { - return ((bitField0_ & 0x00000004) == 0x00000004); + return ((bitField0_ & 0x00000008) == 0x00000008); } /** * optional string vertex_name = 3; @@ -3682,7 +3713,7 @@ public boolean hasVertexName() { * optional .EntityDescriptorProto processor_descriptor = 4; */ public boolean hasProcessorDescriptor() { - return ((bitField0_ & 0x00000008) == 0x00000008); + return ((bitField0_ & 0x00000010) == 0x00000010); } /** * optional .EntityDescriptorProto processor_descriptor = 4; @@ -3812,7 +3843,7 @@ public int getGroupedInputSpecsCount() { * optional int32 vertex_parallelism = 8; */ public boolean hasVertexParallelism() { - return ((bitField0_ & 0x00000010) == 0x00000010); + return ((bitField0_ & 0x00000020) == 0x00000020); } /** * optional int32 vertex_parallelism = 8; @@ -3828,7 +3859,7 @@ public int getVertexParallelism() { * optional int32 fragment_number = 9; */ public boolean hasFragmentNumber() { - return ((bitField0_ & 0x00000020) == 0x00000020); + return ((bitField0_ & 0x00000040) == 0x00000040); } /** * optional int32 fragment_number = 9; @@ -3844,7 +3875,7 @@ public int getFragmentNumber() { * optional int32 attempt_number = 10; */ public boolean hasAttemptNumber() { - return ((bitField0_ & 0x00000040) == 0x00000040); + return ((bitField0_ & 0x00000080) == 0x00000080); } /** * optional int32 attempt_number = 10; @@ -3856,6 +3887,7 @@ public int getAttemptNumber() { private void initFields() { fragmentIdentifierString_ = ""; dagName_ = ""; + dagId_ = 0; vertexName_ = ""; processorDescriptor_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto.getDefaultInstance(); inputSpecs_ = java.util.Collections.emptyList(); @@ -3883,10 +3915,10 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getDagNameBytes()); } - if (((bitField0_ & 0x00000004) == 0x00000004)) { + if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBytes(3, getVertexNameBytes()); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { + if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeMessage(4, processorDescriptor_); } for (int i = 0; i < inputSpecs_.size(); i++) { @@ -3898,15 +3930,18 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) for (int i = 0; i < groupedInputSpecs_.size(); i++) { output.writeMessage(7, groupedInputSpecs_.get(i)); } - if (((bitField0_ & 0x00000010) == 0x00000010)) { + if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeInt32(8, vertexParallelism_); } - if (((bitField0_ & 0x00000020) == 0x00000020)) { + if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeInt32(9, fragmentNumber_); } - if (((bitField0_ & 0x00000040) == 0x00000040)) { + if (((bitField0_ & 0x00000080) == 0x00000080)) { output.writeInt32(10, attemptNumber_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt32(11, dagId_); + } getUnknownFields().writeTo(output); } @@ -3924,11 +3959,11 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeBytesSize(2, getDagNameBytes()); } - if (((bitField0_ & 0x00000004) == 0x00000004)) { + if (((bitField0_ & 0x00000008) == 0x00000008)) { size += com.google.protobuf.CodedOutputStream .computeBytesSize(3, getVertexNameBytes()); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { + if (((bitField0_ & 0x00000010) == 0x00000010)) { size += com.google.protobuf.CodedOutputStream .computeMessageSize(4, processorDescriptor_); } @@ -3944,18 +3979,22 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeMessageSize(7, groupedInputSpecs_.get(i)); } - if (((bitField0_ & 0x00000010) == 0x00000010)) { + if (((bitField0_ & 0x00000020) == 0x00000020)) { size += com.google.protobuf.CodedOutputStream .computeInt32Size(8, vertexParallelism_); } - if (((bitField0_ & 0x00000020) == 0x00000020)) { + if (((bitField0_ & 0x00000040) == 0x00000040)) { size += com.google.protobuf.CodedOutputStream .computeInt32Size(9, fragmentNumber_); } - if (((bitField0_ & 0x00000040) == 0x00000040)) { + if (((bitField0_ & 0x00000080) == 0x00000080)) { size += com.google.protobuf.CodedOutputStream .computeInt32Size(10, attemptNumber_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(11, dagId_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3989,6 +4028,11 @@ public boolean equals(final java.lang.Object obj) { result = result && getDagName() .equals(other.getDagName()); } + result = result && (hasDagId() == other.hasDagId()); + if (hasDagId()) { + result = result && (getDagId() + == other.getDagId()); + } result = result && (hasVertexName() == other.hasVertexName()); if (hasVertexName()) { result = result && getVertexName() @@ -4041,6 +4085,10 @@ public int hashCode() { hash = (37 * hash) + DAG_NAME_FIELD_NUMBER; hash = (53 * hash) + getDagName().hashCode(); } + if (hasDagId()) { + hash = (37 * hash) + DAG_ID_FIELD_NUMBER; + hash = (53 * hash) + getDagId(); + } if (hasVertexName()) { hash = (37 * hash) + VERTEX_NAME_FIELD_NUMBER; hash = (53 * hash) + getVertexName().hashCode(); @@ -4190,38 +4238,40 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000001); dagName_ = ""; bitField0_ = (bitField0_ & ~0x00000002); - vertexName_ = ""; + dagId_ = 0; bitField0_ = (bitField0_ & ~0x00000004); + vertexName_ = ""; + bitField0_ = (bitField0_ & ~0x00000008); if (processorDescriptorBuilder_ == null) { processorDescriptor_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto.getDefaultInstance(); } else { processorDescriptorBuilder_.clear(); } - bitField0_ = (bitField0_ & ~0x00000008); + bitField0_ = (bitField0_ & ~0x00000010); if (inputSpecsBuilder_ == null) { inputSpecs_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000020); } else { inputSpecsBuilder_.clear(); } if (outputSpecsBuilder_ == null) { outputSpecs_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000020); + bitField0_ = (bitField0_ & ~0x00000040); } else { outputSpecsBuilder_.clear(); } if (groupedInputSpecsBuilder_ == null) { groupedInputSpecs_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000040); + bitField0_ = (bitField0_ & ~0x00000080); } else { groupedInputSpecsBuilder_.clear(); } vertexParallelism_ = 0; - bitField0_ = (bitField0_ & ~0x00000080); - fragmentNumber_ = 0; bitField0_ = (bitField0_ & ~0x00000100); - attemptNumber_ = 0; + fragmentNumber_ = 0; bitField0_ = (bitField0_ & ~0x00000200); + attemptNumber_ = 0; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -4261,53 +4311,57 @@ public Builder clone() { if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - result.vertexName_ = vertexName_; + result.dagId_ = dagId_; if (((from_bitField0_ & 0x00000008) == 0x00000008)) { to_bitField0_ |= 0x00000008; } + result.vertexName_ = vertexName_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } if (processorDescriptorBuilder_ == null) { result.processorDescriptor_ = processorDescriptor_; } else { result.processorDescriptor_ = processorDescriptorBuilder_.build(); } if (inputSpecsBuilder_ == null) { - if (((bitField0_ & 0x00000010) == 0x00000010)) { + if (((bitField0_ & 0x00000020) == 0x00000020)) { inputSpecs_ = java.util.Collections.unmodifiableList(inputSpecs_); - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000020); } result.inputSpecs_ = inputSpecs_; } else { result.inputSpecs_ = inputSpecsBuilder_.build(); } if (outputSpecsBuilder_ == null) { - if (((bitField0_ & 0x00000020) == 0x00000020)) { + if (((bitField0_ & 0x00000040) == 0x00000040)) { outputSpecs_ = java.util.Collections.unmodifiableList(outputSpecs_); - bitField0_ = (bitField0_ & ~0x00000020); + bitField0_ = (bitField0_ & ~0x00000040); } result.outputSpecs_ = outputSpecs_; } else { result.outputSpecs_ = outputSpecsBuilder_.build(); } if (groupedInputSpecsBuilder_ == null) { - if (((bitField0_ & 0x00000040) == 0x00000040)) { + if (((bitField0_ & 0x00000080) == 0x00000080)) { groupedInputSpecs_ = java.util.Collections.unmodifiableList(groupedInputSpecs_); - bitField0_ = (bitField0_ & ~0x00000040); + bitField0_ = (bitField0_ & ~0x00000080); } result.groupedInputSpecs_ = groupedInputSpecs_; } else { result.groupedInputSpecs_ = groupedInputSpecsBuilder_.build(); } - if (((from_bitField0_ & 0x00000080) == 0x00000080)) { - to_bitField0_ |= 0x00000010; - } - result.vertexParallelism_ = vertexParallelism_; if (((from_bitField0_ & 0x00000100) == 0x00000100)) { to_bitField0_ |= 0x00000020; } - result.fragmentNumber_ = fragmentNumber_; + result.vertexParallelism_ = vertexParallelism_; if (((from_bitField0_ & 0x00000200) == 0x00000200)) { to_bitField0_ |= 0x00000040; } + result.fragmentNumber_ = fragmentNumber_; + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000080; + } result.attemptNumber_ = attemptNumber_; result.bitField0_ = to_bitField0_; onBuilt(); @@ -4335,8 +4389,11 @@ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtoc dagName_ = other.dagName_; onChanged(); } + if (other.hasDagId()) { + setDagId(other.getDagId()); + } if (other.hasVertexName()) { - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000008; vertexName_ = other.vertexName_; onChanged(); } @@ -4347,7 +4404,7 @@ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtoc if (!other.inputSpecs_.isEmpty()) { if (inputSpecs_.isEmpty()) { inputSpecs_ = other.inputSpecs_; - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000020); } else { ensureInputSpecsIsMutable(); inputSpecs_.addAll(other.inputSpecs_); @@ -4360,7 +4417,7 @@ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtoc inputSpecsBuilder_.dispose(); inputSpecsBuilder_ = null; inputSpecs_ = other.inputSpecs_; - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000020); inputSpecsBuilder_ = com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? getInputSpecsFieldBuilder() : null; @@ -4373,7 +4430,7 @@ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtoc if (!other.outputSpecs_.isEmpty()) { if (outputSpecs_.isEmpty()) { outputSpecs_ = other.outputSpecs_; - bitField0_ = (bitField0_ & ~0x00000020); + bitField0_ = (bitField0_ & ~0x00000040); } else { ensureOutputSpecsIsMutable(); outputSpecs_.addAll(other.outputSpecs_); @@ -4386,7 +4443,7 @@ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtoc outputSpecsBuilder_.dispose(); outputSpecsBuilder_ = null; outputSpecs_ = other.outputSpecs_; - bitField0_ = (bitField0_ & ~0x00000020); + bitField0_ = (bitField0_ & ~0x00000040); outputSpecsBuilder_ = com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? getOutputSpecsFieldBuilder() : null; @@ -4399,7 +4456,7 @@ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtoc if (!other.groupedInputSpecs_.isEmpty()) { if (groupedInputSpecs_.isEmpty()) { groupedInputSpecs_ = other.groupedInputSpecs_; - bitField0_ = (bitField0_ & ~0x00000040); + bitField0_ = (bitField0_ & ~0x00000080); } else { ensureGroupedInputSpecsIsMutable(); groupedInputSpecs_.addAll(other.groupedInputSpecs_); @@ -4412,7 +4469,7 @@ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtoc groupedInputSpecsBuilder_.dispose(); groupedInputSpecsBuilder_ = null; groupedInputSpecs_ = other.groupedInputSpecs_; - bitField0_ = (bitField0_ & ~0x00000040); + bitField0_ = (bitField0_ & ~0x00000080); groupedInputSpecsBuilder_ = com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? getGroupedInputSpecsFieldBuilder() : null; @@ -4605,13 +4662,46 @@ public Builder setDagNameBytes( return this; } + // optional int32 dag_id = 11; + private int dagId_ ; + /** + * optional int32 dag_id = 11; + */ + public boolean hasDagId() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int32 dag_id = 11; + */ + public int getDagId() { + return dagId_; + } + /** + * optional int32 dag_id = 11; + */ + public Builder setDagId(int value) { + bitField0_ |= 0x00000004; + dagId_ = value; + onChanged(); + return this; + } + /** + * optional int32 dag_id = 11; + */ + public Builder clearDagId() { + bitField0_ = (bitField0_ & ~0x00000004); + dagId_ = 0; + onChanged(); + return this; + } + // optional string vertex_name = 3; private java.lang.Object vertexName_ = ""; /** * optional string vertex_name = 3; */ public boolean hasVertexName() { - return ((bitField0_ & 0x00000004) == 0x00000004); + return ((bitField0_ & 0x00000008) == 0x00000008); } /** * optional string vertex_name = 3; @@ -4651,7 +4741,7 @@ public Builder setVertexName( if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000008; vertexName_ = value; onChanged(); return this; @@ -4660,7 +4750,7 @@ public Builder setVertexName( * optional string vertex_name = 3; */ public Builder clearVertexName() { - bitField0_ = (bitField0_ & ~0x00000004); + bitField0_ = (bitField0_ & ~0x00000008); vertexName_ = getDefaultInstance().getVertexName(); onChanged(); return this; @@ -4673,7 +4763,7 @@ public Builder setVertexNameBytes( if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000008; vertexName_ = value; onChanged(); return this; @@ -4687,7 +4777,7 @@ public Builder setVertexNameBytes( * optional .EntityDescriptorProto processor_descriptor = 4; */ public boolean hasProcessorDescriptor() { - return ((bitField0_ & 0x00000008) == 0x00000008); + return ((bitField0_ & 0x00000010) == 0x00000010); } /** * optional .EntityDescriptorProto processor_descriptor = 4; @@ -4712,7 +4802,7 @@ public Builder setProcessorDescriptor(org.apache.hadoop.hive.llap.daemon.rpc.Lla } else { processorDescriptorBuilder_.setMessage(value); } - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000010; return this; } /** @@ -4726,7 +4816,7 @@ public Builder setProcessorDescriptor( } else { processorDescriptorBuilder_.setMessage(builderForValue.build()); } - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000010; return this; } /** @@ -4734,7 +4824,7 @@ public Builder setProcessorDescriptor( */ public Builder mergeProcessorDescriptor(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto value) { if (processorDescriptorBuilder_ == null) { - if (((bitField0_ & 0x00000008) == 0x00000008) && + if (((bitField0_ & 0x00000010) == 0x00000010) && processorDescriptor_ != org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto.getDefaultInstance()) { processorDescriptor_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto.newBuilder(processorDescriptor_).mergeFrom(value).buildPartial(); @@ -4745,7 +4835,7 @@ public Builder mergeProcessorDescriptor(org.apache.hadoop.hive.llap.daemon.rpc.L } else { processorDescriptorBuilder_.mergeFrom(value); } - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000010; return this; } /** @@ -4758,14 +4848,14 @@ public Builder clearProcessorDescriptor() { } else { processorDescriptorBuilder_.clear(); } - bitField0_ = (bitField0_ & ~0x00000008); + bitField0_ = (bitField0_ & ~0x00000010); return this; } /** * optional .EntityDescriptorProto processor_descriptor = 4; */ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto.Builder getProcessorDescriptorBuilder() { - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000010; onChanged(); return getProcessorDescriptorFieldBuilder().getBuilder(); } @@ -4800,9 +4890,9 @@ public Builder clearProcessorDescriptor() { private java.util.List inputSpecs_ = java.util.Collections.emptyList(); private void ensureInputSpecsIsMutable() { - if (!((bitField0_ & 0x00000010) == 0x00000010)) { + if (!((bitField0_ & 0x00000020) == 0x00000020)) { inputSpecs_ = new java.util.ArrayList(inputSpecs_); - bitField0_ |= 0x00000010; + bitField0_ |= 0x00000020; } } @@ -4951,7 +5041,7 @@ public Builder addAllInputSpecs( public Builder clearInputSpecs() { if (inputSpecsBuilder_ == null) { inputSpecs_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000020); onChanged(); } else { inputSpecsBuilder_.clear(); @@ -5028,7 +5118,7 @@ public Builder removeInputSpecs(int index) { inputSpecsBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProtoOrBuilder>( inputSpecs_, - ((bitField0_ & 0x00000010) == 0x00000010), + ((bitField0_ & 0x00000020) == 0x00000020), getParentForChildren(), isClean()); inputSpecs_ = null; @@ -5040,9 +5130,9 @@ public Builder removeInputSpecs(int index) { private java.util.List outputSpecs_ = java.util.Collections.emptyList(); private void ensureOutputSpecsIsMutable() { - if (!((bitField0_ & 0x00000020) == 0x00000020)) { + if (!((bitField0_ & 0x00000040) == 0x00000040)) { outputSpecs_ = new java.util.ArrayList(outputSpecs_); - bitField0_ |= 0x00000020; + bitField0_ |= 0x00000040; } } @@ -5191,7 +5281,7 @@ public Builder addAllOutputSpecs( public Builder clearOutputSpecs() { if (outputSpecsBuilder_ == null) { outputSpecs_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000020); + bitField0_ = (bitField0_ & ~0x00000040); onChanged(); } else { outputSpecsBuilder_.clear(); @@ -5268,7 +5358,7 @@ public Builder removeOutputSpecs(int index) { outputSpecsBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProtoOrBuilder>( outputSpecs_, - ((bitField0_ & 0x00000020) == 0x00000020), + ((bitField0_ & 0x00000040) == 0x00000040), getParentForChildren(), isClean()); outputSpecs_ = null; @@ -5280,9 +5370,9 @@ public Builder removeOutputSpecs(int index) { private java.util.List groupedInputSpecs_ = java.util.Collections.emptyList(); private void ensureGroupedInputSpecsIsMutable() { - if (!((bitField0_ & 0x00000040) == 0x00000040)) { + if (!((bitField0_ & 0x00000080) == 0x00000080)) { groupedInputSpecs_ = new java.util.ArrayList(groupedInputSpecs_); - bitField0_ |= 0x00000040; + bitField0_ |= 0x00000080; } } @@ -5431,7 +5521,7 @@ public Builder addAllGroupedInputSpecs( public Builder clearGroupedInputSpecs() { if (groupedInputSpecsBuilder_ == null) { groupedInputSpecs_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000040); + bitField0_ = (bitField0_ & ~0x00000080); onChanged(); } else { groupedInputSpecsBuilder_.clear(); @@ -5508,7 +5598,7 @@ public Builder removeGroupedInputSpecs(int index) { groupedInputSpecsBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProtoOrBuilder>( groupedInputSpecs_, - ((bitField0_ & 0x00000040) == 0x00000040), + ((bitField0_ & 0x00000080) == 0x00000080), getParentForChildren(), isClean()); groupedInputSpecs_ = null; @@ -5522,7 +5612,7 @@ public Builder removeGroupedInputSpecs(int index) { * optional int32 vertex_parallelism = 8; */ public boolean hasVertexParallelism() { - return ((bitField0_ & 0x00000080) == 0x00000080); + return ((bitField0_ & 0x00000100) == 0x00000100); } /** * optional int32 vertex_parallelism = 8; @@ -5534,7 +5624,7 @@ public int getVertexParallelism() { * optional int32 vertex_parallelism = 8; */ public Builder setVertexParallelism(int value) { - bitField0_ |= 0x00000080; + bitField0_ |= 0x00000100; vertexParallelism_ = value; onChanged(); return this; @@ -5543,7 +5633,7 @@ public Builder setVertexParallelism(int value) { * optional int32 vertex_parallelism = 8; */ public Builder clearVertexParallelism() { - bitField0_ = (bitField0_ & ~0x00000080); + bitField0_ = (bitField0_ & ~0x00000100); vertexParallelism_ = 0; onChanged(); return this; @@ -5555,7 +5645,7 @@ public Builder clearVertexParallelism() { * optional int32 fragment_number = 9; */ public boolean hasFragmentNumber() { - return ((bitField0_ & 0x00000100) == 0x00000100); + return ((bitField0_ & 0x00000200) == 0x00000200); } /** * optional int32 fragment_number = 9; @@ -5567,7 +5657,7 @@ public int getFragmentNumber() { * optional int32 fragment_number = 9; */ public Builder setFragmentNumber(int value) { - bitField0_ |= 0x00000100; + bitField0_ |= 0x00000200; fragmentNumber_ = value; onChanged(); return this; @@ -5576,7 +5666,7 @@ public Builder setFragmentNumber(int value) { * optional int32 fragment_number = 9; */ public Builder clearFragmentNumber() { - bitField0_ = (bitField0_ & ~0x00000100); + bitField0_ = (bitField0_ & ~0x00000200); fragmentNumber_ = 0; onChanged(); return this; @@ -5588,7 +5678,7 @@ public Builder clearFragmentNumber() { * optional int32 attempt_number = 10; */ public boolean hasAttemptNumber() { - return ((bitField0_ & 0x00000200) == 0x00000200); + return ((bitField0_ & 0x00000400) == 0x00000400); } /** * optional int32 attempt_number = 10; @@ -5600,7 +5690,7 @@ public int getAttemptNumber() { * optional int32 attempt_number = 10; */ public Builder setAttemptNumber(int value) { - bitField0_ |= 0x00000200; + bitField0_ |= 0x00000400; attemptNumber_ = value; onChanged(); return this; @@ -5609,7 +5699,7 @@ public Builder setAttemptNumber(int value) { * optional int32 attempt_number = 10; */ public Builder clearAttemptNumber() { - bitField0_ = (bitField0_ & ~0x00000200); + bitField0_ = (bitField0_ & ~0x00000400); attemptNumber_ = 0; onChanged(); return this; @@ -6510,76 +6600,675 @@ public Builder clearCurrentAttemptStartTime() { // @@protoc_insertion_point(class_scope:FragmentRuntimeInfo) } - public interface SubmitWorkRequestProtoOrBuilder + public interface QueryIdentifierProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { - // optional string container_id_string = 1; - /** - * optional string container_id_string = 1; - */ - boolean hasContainerIdString(); - /** - * optional string container_id_string = 1; - */ - java.lang.String getContainerIdString(); - /** - * optional string container_id_string = 1; - */ - com.google.protobuf.ByteString - getContainerIdStringBytes(); - - // optional string am_host = 2; + // optional string app_identifier = 1; /** - * optional string am_host = 2; + * optional string app_identifier = 1; */ - boolean hasAmHost(); + boolean hasAppIdentifier(); /** - * optional string am_host = 2; + * optional string app_identifier = 1; */ - java.lang.String getAmHost(); + java.lang.String getAppIdentifier(); /** - * optional string am_host = 2; + * optional string app_identifier = 1; */ com.google.protobuf.ByteString - getAmHostBytes(); + getAppIdentifierBytes(); - // optional int32 am_port = 3; + // optional int32 dag_identifier = 2; /** - * optional int32 am_port = 3; + * optional int32 dag_identifier = 2; */ - boolean hasAmPort(); + boolean hasDagIdentifier(); /** - * optional int32 am_port = 3; + * optional int32 dag_identifier = 2; */ - int getAmPort(); + int getDagIdentifier(); + } + /** + * Protobuf type {@code QueryIdentifierProto} + */ + public static final class QueryIdentifierProto extends + com.google.protobuf.GeneratedMessage + implements QueryIdentifierProtoOrBuilder { + // Use QueryIdentifierProto.newBuilder() to construct. + private QueryIdentifierProto(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private QueryIdentifierProto(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } - // optional string token_identifier = 4; - /** - * optional string token_identifier = 4; - */ - boolean hasTokenIdentifier(); - /** - * optional string token_identifier = 4; - */ - java.lang.String getTokenIdentifier(); - /** - * optional string token_identifier = 4; - */ - com.google.protobuf.ByteString - getTokenIdentifierBytes(); + private static final QueryIdentifierProto defaultInstance; + public static QueryIdentifierProto getDefaultInstance() { + return defaultInstance; + } - // optional bytes credentials_binary = 5; - /** - * optional bytes credentials_binary = 5; - */ - boolean hasCredentialsBinary(); - /** - * optional bytes credentials_binary = 5; - */ - com.google.protobuf.ByteString getCredentialsBinary(); + public QueryIdentifierProto getDefaultInstanceForType() { + return defaultInstance; + } - // optional string user = 6; - /** + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private QueryIdentifierProto( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + appIdentifier_ = input.readBytes(); + break; + } + case 16: { + bitField0_ |= 0x00000002; + dagIdentifier_ = input.readInt32(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_QueryIdentifierProto_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_QueryIdentifierProto_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public QueryIdentifierProto parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new QueryIdentifierProto(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // optional string app_identifier = 1; + public static final int APP_IDENTIFIER_FIELD_NUMBER = 1; + private java.lang.Object appIdentifier_; + /** + * optional string app_identifier = 1; + */ + public boolean hasAppIdentifier() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional string app_identifier = 1; + */ + public java.lang.String getAppIdentifier() { + java.lang.Object ref = appIdentifier_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + appIdentifier_ = s; + } + return s; + } + } + /** + * optional string app_identifier = 1; + */ + public com.google.protobuf.ByteString + getAppIdentifierBytes() { + java.lang.Object ref = appIdentifier_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + appIdentifier_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional int32 dag_identifier = 2; + public static final int DAG_IDENTIFIER_FIELD_NUMBER = 2; + private int dagIdentifier_; + /** + * optional int32 dag_identifier = 2; + */ + public boolean hasDagIdentifier() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional int32 dag_identifier = 2; + */ + public int getDagIdentifier() { + return dagIdentifier_; + } + + private void initFields() { + appIdentifier_ = ""; + dagIdentifier_ = 0; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getAppIdentifierBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeInt32(2, dagIdentifier_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getAppIdentifierBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(2, dagIdentifier_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto)) { + return super.equals(obj); + } + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto) obj; + + boolean result = true; + result = result && (hasAppIdentifier() == other.hasAppIdentifier()); + if (hasAppIdentifier()) { + result = result && getAppIdentifier() + .equals(other.getAppIdentifier()); + } + result = result && (hasDagIdentifier() == other.hasDagIdentifier()); + if (hasDagIdentifier()) { + result = result && (getDagIdentifier() + == other.getDagIdentifier()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasAppIdentifier()) { + hash = (37 * hash) + APP_IDENTIFIER_FIELD_NUMBER; + hash = (53 * hash) + getAppIdentifier().hashCode(); + } + if (hasDagIdentifier()) { + hash = (37 * hash) + DAG_IDENTIFIER_FIELD_NUMBER; + hash = (53 * hash) + getDagIdentifier(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code QueryIdentifierProto} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_QueryIdentifierProto_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_QueryIdentifierProto_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder.class); + } + + // Construct using org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + appIdentifier_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + dagIdentifier_ = 0; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_QueryIdentifierProto_descriptor; + } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getDefaultInstanceForType() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); + } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto build() { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto buildPartial() { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.appIdentifier_ = appIdentifier_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.dagIdentifier_ = dagIdentifier_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto) { + return mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto other) { + if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance()) return this; + if (other.hasAppIdentifier()) { + bitField0_ |= 0x00000001; + appIdentifier_ = other.appIdentifier_; + onChanged(); + } + if (other.hasDagIdentifier()) { + setDagIdentifier(other.getDagIdentifier()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional string app_identifier = 1; + private java.lang.Object appIdentifier_ = ""; + /** + * optional string app_identifier = 1; + */ + public boolean hasAppIdentifier() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional string app_identifier = 1; + */ + public java.lang.String getAppIdentifier() { + java.lang.Object ref = appIdentifier_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + appIdentifier_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string app_identifier = 1; + */ + public com.google.protobuf.ByteString + getAppIdentifierBytes() { + java.lang.Object ref = appIdentifier_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + appIdentifier_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string app_identifier = 1; + */ + public Builder setAppIdentifier( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + appIdentifier_ = value; + onChanged(); + return this; + } + /** + * optional string app_identifier = 1; + */ + public Builder clearAppIdentifier() { + bitField0_ = (bitField0_ & ~0x00000001); + appIdentifier_ = getDefaultInstance().getAppIdentifier(); + onChanged(); + return this; + } + /** + * optional string app_identifier = 1; + */ + public Builder setAppIdentifierBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + appIdentifier_ = value; + onChanged(); + return this; + } + + // optional int32 dag_identifier = 2; + private int dagIdentifier_ ; + /** + * optional int32 dag_identifier = 2; + */ + public boolean hasDagIdentifier() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional int32 dag_identifier = 2; + */ + public int getDagIdentifier() { + return dagIdentifier_; + } + /** + * optional int32 dag_identifier = 2; + */ + public Builder setDagIdentifier(int value) { + bitField0_ |= 0x00000002; + dagIdentifier_ = value; + onChanged(); + return this; + } + /** + * optional int32 dag_identifier = 2; + */ + public Builder clearDagIdentifier() { + bitField0_ = (bitField0_ & ~0x00000002); + dagIdentifier_ = 0; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:QueryIdentifierProto) + } + + static { + defaultInstance = new QueryIdentifierProto(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:QueryIdentifierProto) + } + + public interface SubmitWorkRequestProtoOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional string container_id_string = 1; + /** + * optional string container_id_string = 1; + */ + boolean hasContainerIdString(); + /** + * optional string container_id_string = 1; + */ + java.lang.String getContainerIdString(); + /** + * optional string container_id_string = 1; + */ + com.google.protobuf.ByteString + getContainerIdStringBytes(); + + // optional string am_host = 2; + /** + * optional string am_host = 2; + */ + boolean hasAmHost(); + /** + * optional string am_host = 2; + */ + java.lang.String getAmHost(); + /** + * optional string am_host = 2; + */ + com.google.protobuf.ByteString + getAmHostBytes(); + + // optional int32 am_port = 3; + /** + * optional int32 am_port = 3; + */ + boolean hasAmPort(); + /** + * optional int32 am_port = 3; + */ + int getAmPort(); + + // optional string token_identifier = 4; + /** + * optional string token_identifier = 4; + */ + boolean hasTokenIdentifier(); + /** + * optional string token_identifier = 4; + */ + java.lang.String getTokenIdentifier(); + /** + * optional string token_identifier = 4; + */ + com.google.protobuf.ByteString + getTokenIdentifierBytes(); + + // optional bytes credentials_binary = 5; + /** + * optional bytes credentials_binary = 5; + */ + boolean hasCredentialsBinary(); + /** + * optional bytes credentials_binary = 5; + */ + com.google.protobuf.ByteString getCredentialsBinary(); + + // optional string user = 6; + /** * optional string user = 6; */ boolean hasUser(); @@ -8800,20 +9489,19 @@ public Builder clearSubmissionState() { public interface SourceStateUpdatedRequestProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { - // optional string dag_name = 1; + // optional .QueryIdentifierProto query_identifier = 1; /** - * optional string dag_name = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - boolean hasDagName(); + boolean hasQueryIdentifier(); /** - * optional string dag_name = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - java.lang.String getDagName(); + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getQueryIdentifier(); /** - * optional string dag_name = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - com.google.protobuf.ByteString - getDagNameBytes(); + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder getQueryIdentifierOrBuilder(); // optional string src_name = 2; /** @@ -8892,8 +9580,16 @@ private SourceStateUpdatedRequestProto( break; } case 10: { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder subBuilder = null; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + subBuilder = queryIdentifier_.toBuilder(); + } + queryIdentifier_ = input.readMessage(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(queryIdentifier_); + queryIdentifier_ = subBuilder.buildPartial(); + } bitField0_ |= 0x00000001; - dagName_ = input.readBytes(); break; } case 18: { @@ -8952,47 +9648,26 @@ public SourceStateUpdatedRequestProto parsePartialFrom( } private int bitField0_; - // optional string dag_name = 1; - public static final int DAG_NAME_FIELD_NUMBER = 1; - private java.lang.Object dagName_; + // optional .QueryIdentifierProto query_identifier = 1; + public static final int QUERY_IDENTIFIER_FIELD_NUMBER = 1; + private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto queryIdentifier_; /** - * optional string dag_name = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public boolean hasDagName() { + public boolean hasQueryIdentifier() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * optional string dag_name = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public java.lang.String getDagName() { - java.lang.Object ref = dagName_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - if (bs.isValidUtf8()) { - dagName_ = s; - } - return s; - } + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getQueryIdentifier() { + return queryIdentifier_; } /** - * optional string dag_name = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public com.google.protobuf.ByteString - getDagNameBytes() { - java.lang.Object ref = dagName_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - dagName_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder getQueryIdentifierOrBuilder() { + return queryIdentifier_; } // optional string src_name = 2; @@ -9055,7 +9730,7 @@ public boolean hasState() { } private void initFields() { - dagName_ = ""; + queryIdentifier_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); srcName_ = ""; state_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED; } @@ -9072,7 +9747,7 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeBytes(1, getDagNameBytes()); + output.writeMessage(1, queryIdentifier_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getSrcNameBytes()); @@ -9091,7 +9766,7 @@ public int getSerializedSize() { size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(1, getDagNameBytes()); + .computeMessageSize(1, queryIdentifier_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream @@ -9124,10 +9799,10 @@ public boolean equals(final java.lang.Object obj) { org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto) obj; boolean result = true; - result = result && (hasDagName() == other.hasDagName()); - if (hasDagName()) { - result = result && getDagName() - .equals(other.getDagName()); + result = result && (hasQueryIdentifier() == other.hasQueryIdentifier()); + if (hasQueryIdentifier()) { + result = result && getQueryIdentifier() + .equals(other.getQueryIdentifier()); } result = result && (hasSrcName() == other.hasSrcName()); if (hasSrcName()) { @@ -9152,9 +9827,9 @@ public int hashCode() { } int hash = 41; hash = (19 * hash) + getDescriptorForType().hashCode(); - if (hasDagName()) { - hash = (37 * hash) + DAG_NAME_FIELD_NUMBER; - hash = (53 * hash) + getDagName().hashCode(); + if (hasQueryIdentifier()) { + hash = (37 * hash) + QUERY_IDENTIFIER_FIELD_NUMBER; + hash = (53 * hash) + getQueryIdentifier().hashCode(); } if (hasSrcName()) { hash = (37 * hash) + SRC_NAME_FIELD_NUMBER; @@ -9265,6 +9940,7 @@ private Builder( } private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getQueryIdentifierFieldBuilder(); } } private static Builder create() { @@ -9273,7 +9949,11 @@ private static Builder create() { public Builder clear() { super.clear(); - dagName_ = ""; + if (queryIdentifierBuilder_ == null) { + queryIdentifier_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); + } else { + queryIdentifierBuilder_.clear(); + } bitField0_ = (bitField0_ & ~0x00000001); srcName_ = ""; bitField0_ = (bitField0_ & ~0x00000002); @@ -9310,7 +9990,11 @@ public Builder clone() { if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } - result.dagName_ = dagName_; + if (queryIdentifierBuilder_ == null) { + result.queryIdentifier_ = queryIdentifier_; + } else { + result.queryIdentifier_ = queryIdentifierBuilder_.build(); + } if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } @@ -9335,10 +10019,8 @@ public Builder mergeFrom(com.google.protobuf.Message other) { public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto other) { if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.getDefaultInstance()) return this; - if (other.hasDagName()) { - bitField0_ |= 0x00000001; - dagName_ = other.dagName_; - onChanged(); + if (other.hasQueryIdentifier()) { + mergeQueryIdentifier(other.getQueryIdentifier()); } if (other.hasSrcName()) { bitField0_ |= 0x00000002; @@ -9375,78 +10057,121 @@ public Builder mergeFrom( } private int bitField0_; - // optional string dag_name = 1; - private java.lang.Object dagName_ = ""; + // optional .QueryIdentifierProto query_identifier = 1; + private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto queryIdentifier_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder> queryIdentifierBuilder_; /** - * optional string dag_name = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public boolean hasDagName() { + public boolean hasQueryIdentifier() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * optional string dag_name = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public java.lang.String getDagName() { - java.lang.Object ref = dagName_; - if (!(ref instanceof java.lang.String)) { - java.lang.String s = ((com.google.protobuf.ByteString) ref) - .toStringUtf8(); - dagName_ = s; - return s; + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getQueryIdentifier() { + if (queryIdentifierBuilder_ == null) { + return queryIdentifier_; } else { - return (java.lang.String) ref; + return queryIdentifierBuilder_.getMessage(); } } /** - * optional string dag_name = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public com.google.protobuf.ByteString - getDagNameBytes() { - java.lang.Object ref = dagName_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - dagName_ = b; - return b; + public Builder setQueryIdentifier(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto value) { + if (queryIdentifierBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + queryIdentifier_ = value; + onChanged(); } else { - return (com.google.protobuf.ByteString) ref; + queryIdentifierBuilder_.setMessage(value); } + bitField0_ |= 0x00000001; + return this; } /** - * optional string dag_name = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public Builder setDagName( - java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000001; - dagName_ = value; - onChanged(); + public Builder setQueryIdentifier( + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder builderForValue) { + if (queryIdentifierBuilder_ == null) { + queryIdentifier_ = builderForValue.build(); + onChanged(); + } else { + queryIdentifierBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * optional .QueryIdentifierProto query_identifier = 1; + */ + public Builder mergeQueryIdentifier(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto value) { + if (queryIdentifierBuilder_ == null) { + if (((bitField0_ & 0x00000001) == 0x00000001) && + queryIdentifier_ != org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance()) { + queryIdentifier_ = + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.newBuilder(queryIdentifier_).mergeFrom(value).buildPartial(); + } else { + queryIdentifier_ = value; + } + onChanged(); + } else { + queryIdentifierBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * optional .QueryIdentifierProto query_identifier = 1; + */ + public Builder clearQueryIdentifier() { + if (queryIdentifierBuilder_ == null) { + queryIdentifier_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); + onChanged(); + } else { + queryIdentifierBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); return this; } /** - * optional string dag_name = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public Builder clearDagName() { - bitField0_ = (bitField0_ & ~0x00000001); - dagName_ = getDefaultInstance().getDagName(); + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder getQueryIdentifierBuilder() { + bitField0_ |= 0x00000001; onChanged(); - return this; + return getQueryIdentifierFieldBuilder().getBuilder(); } /** - * optional string dag_name = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public Builder setDagNameBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000001; - dagName_ = value; - onChanged(); - return this; + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder getQueryIdentifierOrBuilder() { + if (queryIdentifierBuilder_ != null) { + return queryIdentifierBuilder_.getMessageOrBuilder(); + } else { + return queryIdentifier_; + } + } + /** + * optional .QueryIdentifierProto query_identifier = 1; + */ + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder> + getQueryIdentifierFieldBuilder() { + if (queryIdentifierBuilder_ == null) { + queryIdentifierBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder>( + queryIdentifier_, + getParentForChildren(), + isClean()); + queryIdentifier_ = null; + } + return queryIdentifierBuilder_; } // optional string src_name = 2; @@ -9926,28 +10651,27 @@ public Builder mergeFrom( com.google.protobuf.ByteString getQueryIdBytes(); - // optional string dag_name = 2; + // optional .QueryIdentifierProto query_identifier = 2; /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 2; */ - boolean hasDagName(); + boolean hasQueryIdentifier(); /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 2; */ - java.lang.String getDagName(); + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getQueryIdentifier(); /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 2; */ - com.google.protobuf.ByteString - getDagNameBytes(); + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder getQueryIdentifierOrBuilder(); - // optional int64 delete_delay = 3 [default = 0]; + // optional int64 delete_delay = 4 [default = 0]; /** - * optional int64 delete_delay = 3 [default = 0]; + * optional int64 delete_delay = 4 [default = 0]; */ boolean hasDeleteDelay(); /** - * optional int64 delete_delay = 3 [default = 0]; + * optional int64 delete_delay = 4 [default = 0]; */ long getDeleteDelay(); } @@ -10008,11 +10732,19 @@ private QueryCompleteRequestProto( break; } case 18: { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder subBuilder = null; + if (((bitField0_ & 0x00000002) == 0x00000002)) { + subBuilder = queryIdentifier_.toBuilder(); + } + queryIdentifier_ = input.readMessage(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(queryIdentifier_); + queryIdentifier_ = subBuilder.buildPartial(); + } bitField0_ |= 0x00000002; - dagName_ = input.readBytes(); break; } - case 24: { + case 32: { bitField0_ |= 0x00000004; deleteDelay_ = input.readInt64(); break; @@ -10100,60 +10832,39 @@ public boolean hasQueryId() { } } - // optional string dag_name = 2; - public static final int DAG_NAME_FIELD_NUMBER = 2; - private java.lang.Object dagName_; + // optional .QueryIdentifierProto query_identifier = 2; + public static final int QUERY_IDENTIFIER_FIELD_NUMBER = 2; + private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto queryIdentifier_; /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 2; */ - public boolean hasDagName() { + public boolean hasQueryIdentifier() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 2; */ - public java.lang.String getDagName() { - java.lang.Object ref = dagName_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - if (bs.isValidUtf8()) { - dagName_ = s; - } - return s; - } + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getQueryIdentifier() { + return queryIdentifier_; } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 2; */ - public com.google.protobuf.ByteString - getDagNameBytes() { - java.lang.Object ref = dagName_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - dagName_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder getQueryIdentifierOrBuilder() { + return queryIdentifier_; } - // optional int64 delete_delay = 3 [default = 0]; - public static final int DELETE_DELAY_FIELD_NUMBER = 3; + // optional int64 delete_delay = 4 [default = 0]; + public static final int DELETE_DELAY_FIELD_NUMBER = 4; private long deleteDelay_; /** - * optional int64 delete_delay = 3 [default = 0]; + * optional int64 delete_delay = 4 [default = 0]; */ public boolean hasDeleteDelay() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * optional int64 delete_delay = 3 [default = 0]; + * optional int64 delete_delay = 4 [default = 0]; */ public long getDeleteDelay() { return deleteDelay_; @@ -10161,7 +10872,7 @@ public long getDeleteDelay() { private void initFields() { queryId_ = ""; - dagName_ = ""; + queryIdentifier_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); deleteDelay_ = 0L; } private byte memoizedIsInitialized = -1; @@ -10180,10 +10891,10 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) output.writeBytes(1, getQueryIdBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeBytes(2, getDagNameBytes()); + output.writeMessage(2, queryIdentifier_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeInt64(3, deleteDelay_); + output.writeInt64(4, deleteDelay_); } getUnknownFields().writeTo(output); } @@ -10200,11 +10911,11 @@ public int getSerializedSize() { } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(2, getDagNameBytes()); + .computeMessageSize(2, queryIdentifier_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream - .computeInt64Size(3, deleteDelay_); + .computeInt64Size(4, deleteDelay_); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -10234,10 +10945,10 @@ public boolean equals(final java.lang.Object obj) { result = result && getQueryId() .equals(other.getQueryId()); } - result = result && (hasDagName() == other.hasDagName()); - if (hasDagName()) { - result = result && getDagName() - .equals(other.getDagName()); + result = result && (hasQueryIdentifier() == other.hasQueryIdentifier()); + if (hasQueryIdentifier()) { + result = result && getQueryIdentifier() + .equals(other.getQueryIdentifier()); } result = result && (hasDeleteDelay() == other.hasDeleteDelay()); if (hasDeleteDelay()) { @@ -10261,9 +10972,9 @@ public int hashCode() { hash = (37 * hash) + QUERY_ID_FIELD_NUMBER; hash = (53 * hash) + getQueryId().hashCode(); } - if (hasDagName()) { - hash = (37 * hash) + DAG_NAME_FIELD_NUMBER; - hash = (53 * hash) + getDagName().hashCode(); + if (hasQueryIdentifier()) { + hash = (37 * hash) + QUERY_IDENTIFIER_FIELD_NUMBER; + hash = (53 * hash) + getQueryIdentifier().hashCode(); } if (hasDeleteDelay()) { hash = (37 * hash) + DELETE_DELAY_FIELD_NUMBER; @@ -10370,6 +11081,7 @@ private Builder( } private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getQueryIdentifierFieldBuilder(); } } private static Builder create() { @@ -10380,7 +11092,11 @@ public Builder clear() { super.clear(); queryId_ = ""; bitField0_ = (bitField0_ & ~0x00000001); - dagName_ = ""; + if (queryIdentifierBuilder_ == null) { + queryIdentifier_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); + } else { + queryIdentifierBuilder_.clear(); + } bitField0_ = (bitField0_ & ~0x00000002); deleteDelay_ = 0L; bitField0_ = (bitField0_ & ~0x00000004); @@ -10419,7 +11135,11 @@ public Builder clone() { if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } - result.dagName_ = dagName_; + if (queryIdentifierBuilder_ == null) { + result.queryIdentifier_ = queryIdentifier_; + } else { + result.queryIdentifier_ = queryIdentifierBuilder_.build(); + } if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } @@ -10445,10 +11165,8 @@ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtoc queryId_ = other.queryId_; onChanged(); } - if (other.hasDagName()) { - bitField0_ |= 0x00000002; - dagName_ = other.dagName_; - onChanged(); + if (other.hasQueryIdentifier()) { + mergeQueryIdentifier(other.getQueryIdentifier()); } if (other.hasDeleteDelay()) { setDeleteDelay(other.getDeleteDelay()); @@ -10554,96 +11272,139 @@ public Builder setQueryIdBytes( return this; } - // optional string dag_name = 2; - private java.lang.Object dagName_ = ""; + // optional .QueryIdentifierProto query_identifier = 2; + private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto queryIdentifier_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder> queryIdentifierBuilder_; /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 2; */ - public boolean hasDagName() { + public boolean hasQueryIdentifier() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 2; */ - public java.lang.String getDagName() { - java.lang.Object ref = dagName_; - if (!(ref instanceof java.lang.String)) { - java.lang.String s = ((com.google.protobuf.ByteString) ref) - .toStringUtf8(); - dagName_ = s; - return s; + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getQueryIdentifier() { + if (queryIdentifierBuilder_ == null) { + return queryIdentifier_; } else { - return (java.lang.String) ref; + return queryIdentifierBuilder_.getMessage(); } } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 2; */ - public com.google.protobuf.ByteString - getDagNameBytes() { - java.lang.Object ref = dagName_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - dagName_ = b; - return b; + public Builder setQueryIdentifier(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto value) { + if (queryIdentifierBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + queryIdentifier_ = value; + onChanged(); } else { - return (com.google.protobuf.ByteString) ref; + queryIdentifierBuilder_.setMessage(value); } + bitField0_ |= 0x00000002; + return this; } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 2; */ - public Builder setDagName( - java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000002; - dagName_ = value; - onChanged(); + public Builder setQueryIdentifier( + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder builderForValue) { + if (queryIdentifierBuilder_ == null) { + queryIdentifier_ = builderForValue.build(); + onChanged(); + } else { + queryIdentifierBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000002; return this; } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 2; */ - public Builder clearDagName() { + public Builder mergeQueryIdentifier(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto value) { + if (queryIdentifierBuilder_ == null) { + if (((bitField0_ & 0x00000002) == 0x00000002) && + queryIdentifier_ != org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance()) { + queryIdentifier_ = + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.newBuilder(queryIdentifier_).mergeFrom(value).buildPartial(); + } else { + queryIdentifier_ = value; + } + onChanged(); + } else { + queryIdentifierBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * optional .QueryIdentifierProto query_identifier = 2; + */ + public Builder clearQueryIdentifier() { + if (queryIdentifierBuilder_ == null) { + queryIdentifier_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); + onChanged(); + } else { + queryIdentifierBuilder_.clear(); + } bitField0_ = (bitField0_ & ~0x00000002); - dagName_ = getDefaultInstance().getDagName(); - onChanged(); return this; } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 2; */ - public Builder setDagNameBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000002; - dagName_ = value; + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder getQueryIdentifierBuilder() { + bitField0_ |= 0x00000002; onChanged(); - return this; + return getQueryIdentifierFieldBuilder().getBuilder(); + } + /** + * optional .QueryIdentifierProto query_identifier = 2; + */ + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder getQueryIdentifierOrBuilder() { + if (queryIdentifierBuilder_ != null) { + return queryIdentifierBuilder_.getMessageOrBuilder(); + } else { + return queryIdentifier_; + } + } + /** + * optional .QueryIdentifierProto query_identifier = 2; + */ + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder> + getQueryIdentifierFieldBuilder() { + if (queryIdentifierBuilder_ == null) { + queryIdentifierBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder>( + queryIdentifier_, + getParentForChildren(), + isClean()); + queryIdentifier_ = null; + } + return queryIdentifierBuilder_; } - // optional int64 delete_delay = 3 [default = 0]; + // optional int64 delete_delay = 4 [default = 0]; private long deleteDelay_ ; /** - * optional int64 delete_delay = 3 [default = 0]; + * optional int64 delete_delay = 4 [default = 0]; */ public boolean hasDeleteDelay() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * optional int64 delete_delay = 3 [default = 0]; + * optional int64 delete_delay = 4 [default = 0]; */ public long getDeleteDelay() { return deleteDelay_; } /** - * optional int64 delete_delay = 3 [default = 0]; + * optional int64 delete_delay = 4 [default = 0]; */ public Builder setDeleteDelay(long value) { bitField0_ |= 0x00000004; @@ -10652,7 +11413,7 @@ public Builder setDeleteDelay(long value) { return this; } /** - * optional int64 delete_delay = 3 [default = 0]; + * optional int64 delete_delay = 4 [default = 0]; */ public Builder clearDeleteDelay() { bitField0_ = (bitField0_ & ~0x00000004); @@ -11013,47 +11774,31 @@ public Builder mergeFrom( public interface TerminateFragmentRequestProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { - // optional string query_id = 1; - /** - * optional string query_id = 1; - */ - boolean hasQueryId(); - /** - * optional string query_id = 1; - */ - java.lang.String getQueryId(); - /** - * optional string query_id = 1; - */ - com.google.protobuf.ByteString - getQueryIdBytes(); - - // optional string dag_name = 2; + // optional .QueryIdentifierProto query_identifier = 1; /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 1; */ - boolean hasDagName(); + boolean hasQueryIdentifier(); /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 1; */ - java.lang.String getDagName(); + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getQueryIdentifier(); /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 1; */ - com.google.protobuf.ByteString - getDagNameBytes(); + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder getQueryIdentifierOrBuilder(); - // optional string fragment_identifier_string = 7; + // optional string fragment_identifier_string = 2; /** - * optional string fragment_identifier_string = 7; + * optional string fragment_identifier_string = 2; */ boolean hasFragmentIdentifierString(); /** - * optional string fragment_identifier_string = 7; + * optional string fragment_identifier_string = 2; */ java.lang.String getFragmentIdentifierString(); /** - * optional string fragment_identifier_string = 7; + * optional string fragment_identifier_string = 2; */ com.google.protobuf.ByteString getFragmentIdentifierStringBytes(); @@ -11110,17 +11855,20 @@ private TerminateFragmentRequestProto( break; } case 10: { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder subBuilder = null; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + subBuilder = queryIdentifier_.toBuilder(); + } + queryIdentifier_ = input.readMessage(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(queryIdentifier_); + queryIdentifier_ = subBuilder.buildPartial(); + } bitField0_ |= 0x00000001; - queryId_ = input.readBytes(); break; } case 18: { bitField0_ |= 0x00000002; - dagName_ = input.readBytes(); - break; - } - case 58: { - bitField0_ |= 0x00000004; fragmentIdentifierString_ = input.readBytes(); break; } @@ -11158,109 +11906,45 @@ public TerminateFragmentRequestProto parsePartialFrom( } }; - @java.lang.Override - public com.google.protobuf.Parser getParserForType() { - return PARSER; - } - - private int bitField0_; - // optional string query_id = 1; - public static final int QUERY_ID_FIELD_NUMBER = 1; - private java.lang.Object queryId_; - /** - * optional string query_id = 1; - */ - public boolean hasQueryId() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - /** - * optional string query_id = 1; - */ - public java.lang.String getQueryId() { - java.lang.Object ref = queryId_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - if (bs.isValidUtf8()) { - queryId_ = s; - } - return s; - } - } - /** - * optional string query_id = 1; - */ - public com.google.protobuf.ByteString - getQueryIdBytes() { - java.lang.Object ref = queryId_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - queryId_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; } - // optional string dag_name = 2; - public static final int DAG_NAME_FIELD_NUMBER = 2; - private java.lang.Object dagName_; + private int bitField0_; + // optional .QueryIdentifierProto query_identifier = 1; + public static final int QUERY_IDENTIFIER_FIELD_NUMBER = 1; + private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto queryIdentifier_; /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 1; */ - public boolean hasDagName() { - return ((bitField0_ & 0x00000002) == 0x00000002); + public boolean hasQueryIdentifier() { + return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 1; */ - public java.lang.String getDagName() { - java.lang.Object ref = dagName_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - if (bs.isValidUtf8()) { - dagName_ = s; - } - return s; - } + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getQueryIdentifier() { + return queryIdentifier_; } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 1; */ - public com.google.protobuf.ByteString - getDagNameBytes() { - java.lang.Object ref = dagName_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - dagName_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder getQueryIdentifierOrBuilder() { + return queryIdentifier_; } - // optional string fragment_identifier_string = 7; - public static final int FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER = 7; + // optional string fragment_identifier_string = 2; + public static final int FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER = 2; private java.lang.Object fragmentIdentifierString_; /** - * optional string fragment_identifier_string = 7; + * optional string fragment_identifier_string = 2; */ public boolean hasFragmentIdentifierString() { - return ((bitField0_ & 0x00000004) == 0x00000004); + return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * optional string fragment_identifier_string = 7; + * optional string fragment_identifier_string = 2; */ public java.lang.String getFragmentIdentifierString() { java.lang.Object ref = fragmentIdentifierString_; @@ -11277,7 +11961,7 @@ public boolean hasFragmentIdentifierString() { } } /** - * optional string fragment_identifier_string = 7; + * optional string fragment_identifier_string = 2; */ public com.google.protobuf.ByteString getFragmentIdentifierStringBytes() { @@ -11294,8 +11978,7 @@ public boolean hasFragmentIdentifierString() { } private void initFields() { - queryId_ = ""; - dagName_ = ""; + queryIdentifier_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); fragmentIdentifierString_ = ""; } private byte memoizedIsInitialized = -1; @@ -11311,13 +11994,10 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeBytes(1, getQueryIdBytes()); + output.writeMessage(1, queryIdentifier_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeBytes(2, getDagNameBytes()); - } - if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeBytes(7, getFragmentIdentifierStringBytes()); + output.writeBytes(2, getFragmentIdentifierStringBytes()); } getUnknownFields().writeTo(output); } @@ -11330,15 +12010,11 @@ public int getSerializedSize() { size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(1, getQueryIdBytes()); + .computeMessageSize(1, queryIdentifier_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(2, getDagNameBytes()); - } - if (((bitField0_ & 0x00000004) == 0x00000004)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(7, getFragmentIdentifierStringBytes()); + .computeBytesSize(2, getFragmentIdentifierStringBytes()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -11363,15 +12039,10 @@ public boolean equals(final java.lang.Object obj) { org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto) obj; boolean result = true; - result = result && (hasQueryId() == other.hasQueryId()); - if (hasQueryId()) { - result = result && getQueryId() - .equals(other.getQueryId()); - } - result = result && (hasDagName() == other.hasDagName()); - if (hasDagName()) { - result = result && getDagName() - .equals(other.getDagName()); + result = result && (hasQueryIdentifier() == other.hasQueryIdentifier()); + if (hasQueryIdentifier()) { + result = result && getQueryIdentifier() + .equals(other.getQueryIdentifier()); } result = result && (hasFragmentIdentifierString() == other.hasFragmentIdentifierString()); if (hasFragmentIdentifierString()) { @@ -11391,13 +12062,9 @@ public int hashCode() { } int hash = 41; hash = (19 * hash) + getDescriptorForType().hashCode(); - if (hasQueryId()) { - hash = (37 * hash) + QUERY_ID_FIELD_NUMBER; - hash = (53 * hash) + getQueryId().hashCode(); - } - if (hasDagName()) { - hash = (37 * hash) + DAG_NAME_FIELD_NUMBER; - hash = (53 * hash) + getDagName().hashCode(); + if (hasQueryIdentifier()) { + hash = (37 * hash) + QUERY_IDENTIFIER_FIELD_NUMBER; + hash = (53 * hash) + getQueryIdentifier().hashCode(); } if (hasFragmentIdentifierString()) { hash = (37 * hash) + FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER; @@ -11504,6 +12171,7 @@ private Builder( } private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getQueryIdentifierFieldBuilder(); } } private static Builder create() { @@ -11512,12 +12180,14 @@ private static Builder create() { public Builder clear() { super.clear(); - queryId_ = ""; + if (queryIdentifierBuilder_ == null) { + queryIdentifier_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); + } else { + queryIdentifierBuilder_.clear(); + } bitField0_ = (bitField0_ & ~0x00000001); - dagName_ = ""; - bitField0_ = (bitField0_ & ~0x00000002); fragmentIdentifierString_ = ""; - bitField0_ = (bitField0_ & ~0x00000004); + bitField0_ = (bitField0_ & ~0x00000002); return this; } @@ -11549,14 +12219,14 @@ public Builder clone() { if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } - result.queryId_ = queryId_; + if (queryIdentifierBuilder_ == null) { + result.queryIdentifier_ = queryIdentifier_; + } else { + result.queryIdentifier_ = queryIdentifierBuilder_.build(); + } if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } - result.dagName_ = dagName_; - if (((from_bitField0_ & 0x00000004) == 0x00000004)) { - to_bitField0_ |= 0x00000004; - } result.fragmentIdentifierString_ = fragmentIdentifierString_; result.bitField0_ = to_bitField0_; onBuilt(); @@ -11574,18 +12244,11 @@ public Builder mergeFrom(com.google.protobuf.Message other) { public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto other) { if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto.getDefaultInstance()) return this; - if (other.hasQueryId()) { - bitField0_ |= 0x00000001; - queryId_ = other.queryId_; - onChanged(); - } - if (other.hasDagName()) { - bitField0_ |= 0x00000002; - dagName_ = other.dagName_; - onChanged(); + if (other.hasQueryIdentifier()) { + mergeQueryIdentifier(other.getQueryIdentifier()); } if (other.hasFragmentIdentifierString()) { - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000002; fragmentIdentifierString_ = other.fragmentIdentifierString_; onChanged(); } @@ -11616,164 +12279,133 @@ public Builder mergeFrom( } private int bitField0_; - // optional string query_id = 1; - private java.lang.Object queryId_ = ""; + // optional .QueryIdentifierProto query_identifier = 1; + private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto queryIdentifier_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder> queryIdentifierBuilder_; /** - * optional string query_id = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public boolean hasQueryId() { + public boolean hasQueryIdentifier() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * optional string query_id = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public java.lang.String getQueryId() { - java.lang.Object ref = queryId_; - if (!(ref instanceof java.lang.String)) { - java.lang.String s = ((com.google.protobuf.ByteString) ref) - .toStringUtf8(); - queryId_ = s; - return s; + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getQueryIdentifier() { + if (queryIdentifierBuilder_ == null) { + return queryIdentifier_; } else { - return (java.lang.String) ref; + return queryIdentifierBuilder_.getMessage(); } } /** - * optional string query_id = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public com.google.protobuf.ByteString - getQueryIdBytes() { - java.lang.Object ref = queryId_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - queryId_ = b; - return b; + public Builder setQueryIdentifier(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto value) { + if (queryIdentifierBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + queryIdentifier_ = value; + onChanged(); } else { - return (com.google.protobuf.ByteString) ref; + queryIdentifierBuilder_.setMessage(value); } - } - /** - * optional string query_id = 1; - */ - public Builder setQueryId( - java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000001; - queryId_ = value; - onChanged(); - return this; - } - /** - * optional string query_id = 1; - */ - public Builder clearQueryId() { - bitField0_ = (bitField0_ & ~0x00000001); - queryId_ = getDefaultInstance().getQueryId(); - onChanged(); + bitField0_ |= 0x00000001; return this; } /** - * optional string query_id = 1; + * optional .QueryIdentifierProto query_identifier = 1; */ - public Builder setQueryIdBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000001; - queryId_ = value; - onChanged(); + public Builder setQueryIdentifier( + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder builderForValue) { + if (queryIdentifierBuilder_ == null) { + queryIdentifier_ = builderForValue.build(); + onChanged(); + } else { + queryIdentifierBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000001; return this; } - - // optional string dag_name = 2; - private java.lang.Object dagName_ = ""; - /** - * optional string dag_name = 2; - */ - public boolean hasDagName() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 1; */ - public java.lang.String getDagName() { - java.lang.Object ref = dagName_; - if (!(ref instanceof java.lang.String)) { - java.lang.String s = ((com.google.protobuf.ByteString) ref) - .toStringUtf8(); - dagName_ = s; - return s; + public Builder mergeQueryIdentifier(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto value) { + if (queryIdentifierBuilder_ == null) { + if (((bitField0_ & 0x00000001) == 0x00000001) && + queryIdentifier_ != org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance()) { + queryIdentifier_ = + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.newBuilder(queryIdentifier_).mergeFrom(value).buildPartial(); + } else { + queryIdentifier_ = value; + } + onChanged(); } else { - return (java.lang.String) ref; + queryIdentifierBuilder_.mergeFrom(value); } + bitField0_ |= 0x00000001; + return this; } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 1; */ - public com.google.protobuf.ByteString - getDagNameBytes() { - java.lang.Object ref = dagName_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - dagName_ = b; - return b; + public Builder clearQueryIdentifier() { + if (queryIdentifierBuilder_ == null) { + queryIdentifier_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); + onChanged(); } else { - return (com.google.protobuf.ByteString) ref; + queryIdentifierBuilder_.clear(); } + bitField0_ = (bitField0_ & ~0x00000001); + return this; } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 1; */ - public Builder setDagName( - java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000002; - dagName_ = value; + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder getQueryIdentifierBuilder() { + bitField0_ |= 0x00000001; onChanged(); - return this; + return getQueryIdentifierFieldBuilder().getBuilder(); } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 1; */ - public Builder clearDagName() { - bitField0_ = (bitField0_ & ~0x00000002); - dagName_ = getDefaultInstance().getDagName(); - onChanged(); - return this; + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder getQueryIdentifierOrBuilder() { + if (queryIdentifierBuilder_ != null) { + return queryIdentifierBuilder_.getMessageOrBuilder(); + } else { + return queryIdentifier_; + } } /** - * optional string dag_name = 2; + * optional .QueryIdentifierProto query_identifier = 1; */ - public Builder setDagNameBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000002; - dagName_ = value; - onChanged(); - return this; + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder> + getQueryIdentifierFieldBuilder() { + if (queryIdentifierBuilder_ == null) { + queryIdentifierBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder>( + queryIdentifier_, + getParentForChildren(), + isClean()); + queryIdentifier_ = null; + } + return queryIdentifierBuilder_; } - // optional string fragment_identifier_string = 7; + // optional string fragment_identifier_string = 2; private java.lang.Object fragmentIdentifierString_ = ""; /** - * optional string fragment_identifier_string = 7; + * optional string fragment_identifier_string = 2; */ public boolean hasFragmentIdentifierString() { - return ((bitField0_ & 0x00000004) == 0x00000004); + return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * optional string fragment_identifier_string = 7; + * optional string fragment_identifier_string = 2; */ public java.lang.String getFragmentIdentifierString() { java.lang.Object ref = fragmentIdentifierString_; @@ -11787,7 +12419,7 @@ public boolean hasFragmentIdentifierString() { } } /** - * optional string fragment_identifier_string = 7; + * optional string fragment_identifier_string = 2; */ public com.google.protobuf.ByteString getFragmentIdentifierStringBytes() { @@ -11803,36 +12435,36 @@ public boolean hasFragmentIdentifierString() { } } /** - * optional string fragment_identifier_string = 7; + * optional string fragment_identifier_string = 2; */ public Builder setFragmentIdentifierString( java.lang.String value) { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000002; fragmentIdentifierString_ = value; onChanged(); return this; } /** - * optional string fragment_identifier_string = 7; + * optional string fragment_identifier_string = 2; */ public Builder clearFragmentIdentifierString() { - bitField0_ = (bitField0_ & ~0x00000004); + bitField0_ = (bitField0_ & ~0x00000002); fragmentIdentifierString_ = getDefaultInstance().getFragmentIdentifierString(); onChanged(); return this; } /** - * optional string fragment_identifier_string = 7; + * optional string fragment_identifier_string = 2; */ public Builder setFragmentIdentifierStringBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000002; fragmentIdentifierString_ = value; onChanged(); return this; @@ -13670,6 +14302,11 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_FragmentRuntimeInfo_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor + internal_static_QueryIdentifierProto_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_QueryIdentifierProto_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor internal_static_SubmitWorkRequestProto_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable @@ -13739,58 +14376,62 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { "roupInputSpecProto\022\022\n\ngroup_name\030\001 \001(\t\022\026" + "\n\016group_vertices\030\002 \003(\t\0227\n\027merged_input_d", "escriptor\030\003 \001(\0132\026.EntityDescriptorProto\"" + - "\333\002\n\021FragmentSpecProto\022\"\n\032fragment_identi" + - "fier_string\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t\022\023\n\013v" + - "ertex_name\030\003 \001(\t\0224\n\024processor_descriptor" + - "\030\004 \001(\0132\026.EntityDescriptorProto\022!\n\013input_" + - "specs\030\005 \003(\0132\014.IOSpecProto\022\"\n\014output_spec" + - "s\030\006 \003(\0132\014.IOSpecProto\0221\n\023grouped_input_s" + - "pecs\030\007 \003(\0132\024.GroupInputSpecProto\022\032\n\022vert" + - "ex_parallelism\030\010 \001(\005\022\027\n\017fragment_number\030" + - "\t \001(\005\022\026\n\016attempt_number\030\n \001(\005\"\344\001\n\023Fragme", - "ntRuntimeInfo\022#\n\033num_self_and_upstream_t" + - "asks\030\001 \001(\005\022-\n%num_self_and_upstream_comp" + - "leted_tasks\030\002 \001(\005\022\033\n\023within_dag_priority" + - "\030\003 \001(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030first_" + - "attempt_start_time\030\005 \001(\003\022\"\n\032current_atte" + - "mpt_start_time\030\006 \001(\003\"\266\002\n\026SubmitWorkReque" + - "stProto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007" + - "am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_" + - "identifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005" + - " \001(\014\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_str", - "ing\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n" + - "\rfragment_spec\030\t \001(\0132\022.FragmentSpecProto" + - "\0223\n\025fragment_runtime_info\030\n \001(\0132\024.Fragme" + - "ntRuntimeInfo\"J\n\027SubmitWorkResponseProto" + - "\022/\n\020submission_state\030\001 \001(\0162\025.SubmissionS" + - "tateProto\"f\n\036SourceStateUpdatedRequestPr" + - "oto\022\020\n\010dag_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022" + + "\353\002\n\021FragmentSpecProto\022\"\n\032fragment_identi" + + "fier_string\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t\022\016\n\006d" + + "ag_id\030\013 \001(\005\022\023\n\013vertex_name\030\003 \001(\t\0224\n\024proc" + + "essor_descriptor\030\004 \001(\0132\026.EntityDescripto" + + "rProto\022!\n\013input_specs\030\005 \003(\0132\014.IOSpecProt" + + "o\022\"\n\014output_specs\030\006 \003(\0132\014.IOSpecProto\0221\n" + + "\023grouped_input_specs\030\007 \003(\0132\024.GroupInputS" + + "pecProto\022\032\n\022vertex_parallelism\030\010 \001(\005\022\027\n\017" + + "fragment_number\030\t \001(\005\022\026\n\016attempt_number\030", + "\n \001(\005\"\344\001\n\023FragmentRuntimeInfo\022#\n\033num_sel" + + "f_and_upstream_tasks\030\001 \001(\005\022-\n%num_self_a" + + "nd_upstream_completed_tasks\030\002 \001(\005\022\033\n\023wit" + + "hin_dag_priority\030\003 \001(\005\022\026\n\016dag_start_time" + + "\030\004 \001(\003\022 \n\030first_attempt_start_time\030\005 \001(\003" + + "\022\"\n\032current_attempt_start_time\030\006 \001(\003\"F\n\024" + + "QueryIdentifierProto\022\026\n\016app_identifier\030\001" + + " \001(\t\022\026\n\016dag_identifier\030\002 \001(\005\"\266\002\n\026SubmitW" + + "orkRequestProto\022\033\n\023container_id_string\030\001" + + " \001(\t\022\017\n\007am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030", + "\n\020token_identifier\030\004 \001(\t\022\032\n\022credentials_" + + "binary\030\005 \001(\014\022\014\n\004user\030\006 \001(\t\022\035\n\025applicatio" + + "n_id_string\030\007 \001(\t\022\032\n\022app_attempt_number\030" + + "\010 \001(\005\022)\n\rfragment_spec\030\t \001(\0132\022.FragmentS" + + "pecProto\0223\n\025fragment_runtime_info\030\n \001(\0132" + + "\024.FragmentRuntimeInfo\"J\n\027SubmitWorkRespo" + + "nseProto\022/\n\020submission_state\030\001 \001(\0162\025.Sub" + + "missionStateProto\"\205\001\n\036SourceStateUpdated" + + "RequestProto\022/\n\020query_identifier\030\001 \001(\0132\025" + + ".QueryIdentifierProto\022\020\n\010src_name\030\002 \001(\t\022", " \n\005state\030\003 \001(\0162\021.SourceStateProto\"!\n\037Sou" + - "rceStateUpdatedResponseProto\"X\n\031QueryCom" + - "pleteRequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010d", - "ag_name\030\002 \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034" + - "\n\032QueryCompleteResponseProto\"g\n\035Terminat" + - "eFragmentRequestProto\022\020\n\010query_id\030\001 \001(\t\022" + - "\020\n\010dag_name\030\002 \001(\t\022\"\n\032fragment_identifier" + - "_string\030\007 \001(\t\" \n\036TerminateFragmentRespon" + - "seProto\"\026\n\024GetTokenRequestProto\"&\n\025GetTo" + - "kenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020Sourc" + - "eStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNIN" + - "G\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEPTED\020" + - "\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022L", - "lapDaemonProtocol\022?\n\nsubmitWork\022\027.Submit" + - "WorkRequestProto\032\030.SubmitWorkResponsePro" + - "to\022W\n\022sourceStateUpdated\022\037.SourceStateUp" + - "datedRequestProto\032 .SourceStateUpdatedRe" + - "sponseProto\022H\n\rqueryComplete\022\032.QueryComp" + - "leteRequestProto\032\033.QueryCompleteResponse" + - "Proto\022T\n\021terminateFragment\022\036.TerminateFr" + - "agmentRequestProto\032\037.TerminateFragmentRe" + - "sponseProto2]\n\026LlapManagementProtocol\022C\n" + - "\022getDelegationToken\022\025.GetTokenRequestPro", - "to\032\026.GetTokenResponseProtoBH\n&org.apache" + - ".hadoop.hive.llap.daemon.rpcB\030LlapDaemon" + - "ProtocolProtos\210\001\001\240\001\001" + "rceStateUpdatedResponseProto\"w\n\031QueryCom" + + "pleteRequestProto\022\020\n\010query_id\030\001 \001(\t\022/\n\020q" + + "uery_identifier\030\002 \001(\0132\025.QueryIdentifierP" + + "roto\022\027\n\014delete_delay\030\004 \001(\003:\0010\"\034\n\032QueryCo" + + "mpleteResponseProto\"t\n\035TerminateFragment" + + "RequestProto\022/\n\020query_identifier\030\001 \001(\0132\025" + + ".QueryIdentifierProto\022\"\n\032fragment_identi" + + "fier_string\030\002 \001(\t\" \n\036TerminateFragmentRe" + + "sponseProto\"\026\n\024GetTokenRequestProto\"&\n\025G", + "etTokenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020S" + + "ourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RU" + + "NNING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEP" + + "TED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316" + + "\002\n\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Su" + + "bmitWorkRequestProto\032\030.SubmitWorkRespons" + + "eProto\022W\n\022sourceStateUpdated\022\037.SourceSta" + + "teUpdatedRequestProto\032 .SourceStateUpdat" + + "edResponseProto\022H\n\rqueryComplete\022\032.Query" + + "CompleteRequestProto\032\033.QueryCompleteResp", + "onseProto\022T\n\021terminateFragment\022\036.Termina" + + "teFragmentRequestProto\032\037.TerminateFragme" + + "ntResponseProto2]\n\026LlapManagementProtoco" + + "l\022C\n\022getDelegationToken\022\025.GetTokenReques" + + "tProto\032\026.GetTokenResponseProtoBH\n&org.ap" + + "ache.hadoop.hive.llap.daemon.rpcB\030LlapDa" + + "emonProtocolProtos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -13826,69 +14467,75 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { internal_static_FragmentSpecProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_FragmentSpecProto_descriptor, - new java.lang.String[] { "FragmentIdentifierString", "DagName", "VertexName", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", "FragmentNumber", "AttemptNumber", }); + new java.lang.String[] { "FragmentIdentifierString", "DagName", "DagId", "VertexName", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", "FragmentNumber", "AttemptNumber", }); internal_static_FragmentRuntimeInfo_descriptor = getDescriptor().getMessageTypes().get(5); internal_static_FragmentRuntimeInfo_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_FragmentRuntimeInfo_descriptor, new java.lang.String[] { "NumSelfAndUpstreamTasks", "NumSelfAndUpstreamCompletedTasks", "WithinDagPriority", "DagStartTime", "FirstAttemptStartTime", "CurrentAttemptStartTime", }); - internal_static_SubmitWorkRequestProto_descriptor = + internal_static_QueryIdentifierProto_descriptor = getDescriptor().getMessageTypes().get(6); + internal_static_QueryIdentifierProto_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_QueryIdentifierProto_descriptor, + new java.lang.String[] { "AppIdentifier", "DagIdentifier", }); + internal_static_SubmitWorkRequestProto_descriptor = + getDescriptor().getMessageTypes().get(7); internal_static_SubmitWorkRequestProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SubmitWorkRequestProto_descriptor, new java.lang.String[] { "ContainerIdString", "AmHost", "AmPort", "TokenIdentifier", "CredentialsBinary", "User", "ApplicationIdString", "AppAttemptNumber", "FragmentSpec", "FragmentRuntimeInfo", }); internal_static_SubmitWorkResponseProto_descriptor = - getDescriptor().getMessageTypes().get(7); + getDescriptor().getMessageTypes().get(8); internal_static_SubmitWorkResponseProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SubmitWorkResponseProto_descriptor, new java.lang.String[] { "SubmissionState", }); internal_static_SourceStateUpdatedRequestProto_descriptor = - getDescriptor().getMessageTypes().get(8); + getDescriptor().getMessageTypes().get(9); internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SourceStateUpdatedRequestProto_descriptor, - new java.lang.String[] { "DagName", "SrcName", "State", }); + new java.lang.String[] { "QueryIdentifier", "SrcName", "State", }); internal_static_SourceStateUpdatedResponseProto_descriptor = - getDescriptor().getMessageTypes().get(9); + getDescriptor().getMessageTypes().get(10); internal_static_SourceStateUpdatedResponseProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SourceStateUpdatedResponseProto_descriptor, new java.lang.String[] { }); internal_static_QueryCompleteRequestProto_descriptor = - getDescriptor().getMessageTypes().get(10); + getDescriptor().getMessageTypes().get(11); internal_static_QueryCompleteRequestProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_QueryCompleteRequestProto_descriptor, - new java.lang.String[] { "QueryId", "DagName", "DeleteDelay", }); + new java.lang.String[] { "QueryId", "QueryIdentifier", "DeleteDelay", }); internal_static_QueryCompleteResponseProto_descriptor = - getDescriptor().getMessageTypes().get(11); + getDescriptor().getMessageTypes().get(12); internal_static_QueryCompleteResponseProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_QueryCompleteResponseProto_descriptor, new java.lang.String[] { }); internal_static_TerminateFragmentRequestProto_descriptor = - getDescriptor().getMessageTypes().get(12); + getDescriptor().getMessageTypes().get(13); internal_static_TerminateFragmentRequestProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_TerminateFragmentRequestProto_descriptor, - new java.lang.String[] { "QueryId", "DagName", "FragmentIdentifierString", }); + new java.lang.String[] { "QueryIdentifier", "FragmentIdentifierString", }); internal_static_TerminateFragmentResponseProto_descriptor = - getDescriptor().getMessageTypes().get(13); + getDescriptor().getMessageTypes().get(14); internal_static_TerminateFragmentResponseProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_TerminateFragmentResponseProto_descriptor, new java.lang.String[] { }); internal_static_GetTokenRequestProto_descriptor = - getDescriptor().getMessageTypes().get(14); + getDescriptor().getMessageTypes().get(15); internal_static_GetTokenRequestProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_GetTokenRequestProto_descriptor, new java.lang.String[] { }); internal_static_GetTokenResponseProto_descriptor = - getDescriptor().getMessageTypes().get(15); + getDescriptor().getMessageTypes().get(16); internal_static_GetTokenResponseProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_GetTokenResponseProto_descriptor, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java index 3c9ad24..f1fc285 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java @@ -27,6 +27,7 @@ private static final String HISTORY_START_TIME = "StartTime"; private static final String HISTORY_END_TIME = "EndTime"; private static final String HISTORY_DAG_NAME = "DagName"; + private static final String HISTORY_DAG_ID = "DagId"; private static final String HISTORY_VERTEX_NAME = "VertexName"; private static final String HISTORY_TASK_ID = "TaskId"; private static final String HISTORY_ATTEMPT_ID = "TaskAttemptId"; @@ -41,29 +42,30 @@ public static void logFragmentStart(String applicationIdStr, String containerIdStr, String hostname, - String dagName, String vertexName, int taskId, + String dagName, int dagIdentifier, String vertexName, int taskId, int attemptId) { HISTORY_LOGGER.info( - constructFragmentStartString(applicationIdStr, containerIdStr, hostname, dagName, + constructFragmentStartString(applicationIdStr, containerIdStr, hostname, dagName, dagIdentifier, vertexName, taskId, attemptId)); } public static void logFragmentEnd(String applicationIdStr, String containerIdStr, String hostname, - String dagName, String vertexName, int taskId, int attemptId, + String dagName, int dagIdentifier, String vertexName, int taskId, int attemptId, String threadName, long startTime, boolean failed) { HISTORY_LOGGER.info(constructFragmentEndString(applicationIdStr, containerIdStr, hostname, - dagName, vertexName, taskId, attemptId, threadName, startTime, failed)); + dagName, dagIdentifier, vertexName, taskId, attemptId, threadName, startTime, failed)); } private static String constructFragmentStartString(String applicationIdStr, String containerIdStr, - String hostname, String dagName, + String hostname, String dagName, int dagIdentifier, String vertexName, int taskId, int attemptId) { HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_START); lb.addHostName(hostname); lb.addAppid(applicationIdStr); lb.addContainerId(containerIdStr); lb.addDagName(dagName); + lb.addDagId(dagIdentifier); lb.addVertexName(vertexName); lb.addTaskId(taskId); lb.addTaskAttemptId(attemptId); @@ -72,7 +74,7 @@ private static String constructFragmentStartString(String applicationIdStr, Stri } private static String constructFragmentEndString(String applicationIdStr, String containerIdStr, - String hostname, String dagName, + String hostname, String dagName, int dagIdentifier, String vertexName, int taskId, int attemptId, String threadName, long startTime, boolean succeeded) { HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_END); @@ -80,6 +82,7 @@ private static String constructFragmentEndString(String applicationIdStr, String lb.addAppid(applicationIdStr); lb.addContainerId(containerIdStr); lb.addDagName(dagName); + lb.addDagId(dagIdentifier); lb.addVertexName(vertexName); lb.addTaskId(taskId); lb.addTaskAttemptId(attemptId); @@ -113,6 +116,10 @@ HistoryLineBuilder addDagName(String dagName) { return setKeyValue(HISTORY_DAG_NAME, dagName); } + HistoryLineBuilder addDagId(int dagId) { + return setKeyValue(HISTORY_DAG_ID, String.valueOf(dagId)); + } + HistoryLineBuilder addVertexName(String vertexName) { return setKeyValue(HISTORY_VERTEX_NAME, vertexName); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java index 7cb433b..e2caec2 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.daemon; +import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier; import org.apache.hadoop.security.token.Token; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -24,6 +25,6 @@ // inferred from this. // Passing in parameters until there's some dag information stored and tracked in the daemon. void taskKilled(String amLocation, int port, String user, - Token jobToken, String queryId, String dagName, + Token jobToken, QueryIdentifier queryIdentifier, TezTaskAttemptID taskAttemptId); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java index 4e62a68..7f9553d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java @@ -14,7 +14,9 @@ package org.apache.hadoop.hive.llap.daemon; +import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier; + public interface QueryFailedHandler { - public void queryFailed(String queryId, String dagName); + public void queryFailed(QueryIdentifier queryIdentifier); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index f6711d8..d1ec715 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -172,9 +172,9 @@ public void serviceStop() { } public void registerTask(String amLocation, int port, String user, - Token jobToken, String queryId, String dagName) { + Token jobToken, QueryIdentifier queryIdentifier) { if (LOG.isTraceEnabled()) { - LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for dagName=" + dagName); + LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for queryIdentifier=" + queryIdentifier); } AMNodeInfo amNodeInfo; synchronized (knownAppMasters) { @@ -182,7 +182,7 @@ public void registerTask(String amLocation, int port, String user, amNodeInfo = knownAppMasters.get(amNodeId); if (amNodeInfo == null) { amNodeInfo = - new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, retryTimeout, socketFactory, + new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); knownAppMasters.put(amNodeId, amNodeInfo); // Add to the queue only the first time this is registered, and on @@ -190,7 +190,7 @@ public void registerTask(String amLocation, int port, String user, amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval); pendingHeartbeatQueeu.add(amNodeInfo); } - amNodeInfo.setCurrentDagName(dagName); + amNodeInfo.setCurrentQueryIdentifier(queryIdentifier); amNodeInfo.incrementAndGetTaskCount(); } } @@ -214,12 +214,12 @@ public void unregisterTask(String amLocation, int port) { } public void taskKilled(String amLocation, int port, String user, Token jobToken, - final String queryId, final String dagName, final TezTaskAttemptID taskAttemptId) { + final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) { // Not re-using the connection for the AM heartbeat - which may or may not be open by this point. // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection. LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); AMNodeInfo amNodeInfo = - new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, retryTimeout, socketFactory, + new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); // Even if the service hasn't started up. It's OK to make this invocation since this will @@ -251,8 +251,8 @@ protected Void callInternal() { synchronized (knownAppMasters) { if (LOG.isDebugEnabled()) { LOG.debug( - "Removing am {} with last associated dag{} from heartbeat with taskCount={}, amFailed={}", - amNodeInfo.amNodeId, amNodeInfo.getCurrentDagName(), amNodeInfo.getTaskCount(), + "Removing am {} with last associated dag {} from heartbeat with taskCount={}, amFailed={}", + amNodeInfo.amNodeId, amNodeInfo.getCurrentQueryIdentifier(), amNodeInfo.getTaskCount(), amNodeInfo.hasAmFailed(), amNodeInfo); } knownAppMasters.remove(amNodeInfo.amNodeId); @@ -272,11 +272,11 @@ public void onSuccess(Void result) { @Override public void onFailure(Throwable t) { - String currentDagName = amNodeInfo.getCurrentDagName(); + QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); amNodeInfo.setAmFailed(true); LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}", - amNodeInfo.amNodeId, currentDagName, t); - queryFailedHandler.queryFailed(null, currentDagName); + amNodeInfo.amNodeId, currentQueryIdentifier, t); + queryFailedHandler.queryFailed(currentQueryIdentifier); } }); } @@ -339,11 +339,11 @@ protected Void callInternal() { amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()), nodeId.getPort()); } catch (IOException e) { - String currentDagName = amNodeInfo.getCurrentDagName(); + QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); amNodeInfo.setAmFailed(true); LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}", - amNodeInfo.amNodeId, currentDagName, e); - queryFailedHandler.queryFailed(null, currentDagName); + amNodeInfo.amNodeId, currentQueryIdentifier, e); + queryFailedHandler.queryFailed(currentQueryIdentifier); } catch (InterruptedException e) { if (!isShutdown.get()) { LOG.warn("Interrupted while trying to send heartbeat to AM {}", amNodeInfo.amNodeId, e); @@ -370,21 +370,21 @@ protected Void callInternal() { private final long timeout; private final SocketFactory socketFactory; private final AtomicBoolean amFailed = new AtomicBoolean(false); - private String currentDagName; + private QueryIdentifier currentQueryIdentifier; private LlapTaskUmbilicalProtocol umbilical; private long nextHeartbeatTime; public AMNodeInfo(LlapNodeId amNodeId, String user, Token jobToken, - String currentDagName, + QueryIdentifier currentQueryIdentifier, RetryPolicy retryPolicy, long timeout, SocketFactory socketFactory, Configuration conf) { this.user = user; this.jobToken = jobToken; - this.currentDagName = currentDagName; + this.currentQueryIdentifier = currentQueryIdentifier; this.retryPolicy = retryPolicy; this.timeout = timeout; this.socketFactory = socketFactory; @@ -439,12 +439,12 @@ int getTaskCount() { return taskCount.get(); } - public synchronized String getCurrentDagName() { - return currentDagName; + public synchronized QueryIdentifier getCurrentQueryIdentifier() { + return currentQueryIdentifier; } - public synchronized void setCurrentDagName(String currentDagName) { - this.currentDagName = currentDagName; + public synchronized void setCurrentQueryIdentifier(QueryIdentifier queryIdentifier) { + this.currentQueryIdentifier = queryIdentifier; } synchronized void setNextHeartbeatTime(long nextTime) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 0d85671..a2a55cc 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -60,6 +60,8 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.hadoop.shim.HadoopShim; +import org.apache.tez.hadoop.shim.HadoopShimsLoader; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,6 +87,7 @@ private final Configuration conf; private final TaskRunnerCallable.ConfParams confParams; private final KilledTaskHandler killedTaskHandler = new KilledTaskHandlerImpl(); + private final HadoopShim tezHadoopShim; public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize, boolean enablePreemption, String[] localDirsBase, AtomicReference localShufflePort, @@ -122,6 +125,7 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi conf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT, TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT) ); + tezHadoopShim = new HadoopShimsLoader(conf).getHadoopShim(); LOG.info("ContainerRunnerImpl config: " + "memoryPerExecutorDerviced=" + memoryPerExecutor @@ -149,7 +153,7 @@ protected void serviceStop() throws Exception { @Override public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException { HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(), - localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), + localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), request.getFragmentSpec().getDagId(), request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber()); if (LOG.isInfoEnabled()) { @@ -172,8 +176,10 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws fragmentSpec.getFragmentIdentifierString()); int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); + QueryIdentifier queryIdentifier = new QueryIdentifier(request.getApplicationIdString(), dagIdentifier); + QueryFragmentInfo fragmentInfo = queryTracker - .registerFragment(null, request.getApplicationIdString(), fragmentSpec.getDagName(), + .registerFragment(queryIdentifier, request.getApplicationIdString(), fragmentSpec.getDagName(), dagIdentifier, fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(), fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec()); @@ -205,7 +211,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()), new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, - this); + this, tezHadoopShim); submissionState = executorService.schedule(callable); if (LOG.isInfoEnabled()) { @@ -239,28 +245,37 @@ public LlapExecutionContext(String hostname, QueryTracker queryTracker) { @Override public void initializeHook(TezProcessor source) { - queryTracker.registerDagQueryId(source.getContext().getDAGName(), + queryTracker.registerDagQueryId( + new QueryIdentifier(source.getContext().getApplicationId().toString(), + source.getContext().getDagIdentifier()), HiveConf.getVar(source.getConf(), HiveConf.ConfVars.HIVEQUERYID)); } } @Override - public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) { + public SourceStateUpdatedResponseProto sourceStateUpdated( + SourceStateUpdatedRequestProto request) { LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request)); - queryTracker.registerSourceStateChange(request.getDagName(), request.getSrcName(), + queryTracker.registerSourceStateChange( + new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(), + request.getQueryIdentifier().getDagIdentifier()), request.getSrcName(), request.getState()); return SourceStateUpdatedResponseProto.getDefaultInstance(); } @Override public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) { - LOG.info("Processing queryComplete notification for {}", request.getDagName()); + QueryIdentifier queryIdentifier = + new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(), + request.getQueryIdentifier().getDagIdentifier()); + LOG.info("Processing queryComplete notification for {}", queryIdentifier); List knownFragments = - queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay()); - LOG.info("DBG: Pending fragment count for completed query {} = {}", request.getDagName(), + queryTracker + .queryComplete(queryIdentifier, request.getDeleteDelay()); + LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier, knownFragments.size()); for (QueryFragmentInfo fragmentInfo : knownFragments) { - LOG.info("DBG: Issuing killFragment for completed query {} {}", request.getDagName(), + LOG.info("DBG: Issuing killFragment for completed query {} {}", queryIdentifier, fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } @@ -276,7 +291,9 @@ public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequest private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) { StringBuilder sb = new StringBuilder(); - sb.append("dagName=").append(request.getDagName()) + QueryIdentifier queryIdentifier = new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(), + request.getQueryIdentifier().getDagIdentifier()); + sb.append("queryIdentifier=").append(queryIdentifier) .append(", ").append("sourceName=").append(request.getSrcName()) .append(", ").append("state=").append(request.getState()); return sb.toString(); @@ -342,14 +359,14 @@ public void fragmentComplete(QueryFragmentInfo fragmentInfo) { } @Override - public void queryFailed(String queryId, String dagName) { - LOG.info("Processing query failed notification for {}", dagName); + public void queryFailed(QueryIdentifier queryIdentifier) { + LOG.info("Processing query failed notification for {}", queryIdentifier); List knownFragments = - queryTracker.queryComplete(queryId, dagName, -1); - LOG.info("DBG: Pending fragment count for failed query {} = {}", dagName, + queryTracker.queryComplete(queryIdentifier, -1); + LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier, knownFragments.size()); for (QueryFragmentInfo fragmentInfo : knownFragments) { - LOG.info("DBG: Issuing killFragment for failed query {} {}", dagName, + LOG.info("DBG: Issuing killFragment for failed query {} {}", queryIdentifier, fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } @@ -359,9 +376,9 @@ public void queryFailed(String queryId, String dagName) { @Override public void taskKilled(String amLocation, int port, String user, - Token jobToken, String queryId, String dagName, + Token jobToken, QueryIdentifier queryIdentifier, TezTaskAttemptID taskAttemptId) { - amReporter.taskKilled(amLocation, port, user, jobToken, queryId, dagName, taskAttemptId); + amReporter.taskKilled(amLocation, port, user, jobToken, queryIdentifier, taskAttemptId); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index ddedfbf..2af1758 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -435,8 +435,8 @@ public void uncaughtException(Thread t, Throwable e) { private class QueryFailedHandlerProxy implements QueryFailedHandler { @Override - public void queryFailed(String queryId, String dagName) { - containerRunner.queryFailed(queryId, dagName); + public void queryFailed(QueryIdentifier queryIdentifier) { + containerRunner.queryFailed(queryIdentifier); } } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index fc66254..bb9f341 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -350,13 +350,15 @@ private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) { float progress = 0; if (task.hasInitialized()) { progress = task.getProgress(); + // TODO HIVE-12449. Make use of progress notifications once Hive starts sending them out. + // progressNotified = task.getAndClearProgressNotification(); if (sendCounters) { // send these potentially large objects at longer intervals to avoid overloading the AM counters = task.getCounters(); stats = task.getTaskStatistics(); } } - return new TaskStatusUpdateEvent(counters, progress, stats); + return new TaskStatusUpdateEvent(counters, progress, stats, true); } /** diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java new file mode 100644 index 0000000..96e77e4 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +/** + * An identifier for a query, which is unique. + */ +public final class QueryIdentifier { + + private final String appIdentifier; + private final int dagIdentifier; + + + public QueryIdentifier(String appIdentifier, int dagIdentifier) { + this.appIdentifier = appIdentifier; + this.dagIdentifier = dagIdentifier; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !getClass().isAssignableFrom(o.getClass())) { + return false; + } + + QueryIdentifier that = (QueryIdentifier) o; + + if (dagIdentifier != that.dagIdentifier) { + return false; + } + return appIdentifier.equals(that.appIdentifier); + + } + + @Override + public int hashCode() { + int result = appIdentifier.hashCode(); + result = 31 * result + dagIdentifier; + return result; + } + + @Override + public String toString() { + return "QueryIdentifier{" + + "appIdentifier='" + appIdentifier + '\'' + + ", dagIdentifier=" + dagIdentifier + + '}'; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 27f2d4c..8bec95f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; public class QueryInfo { - private final String queryId; + private final QueryIdentifier queryIdentifier; private final String appIdString; private final String dagName; private final int dagIdentifier; @@ -54,10 +54,10 @@ private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker(); - public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier, + public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagName, int dagIdentifier, String user, ConcurrentMap sourceStateMap, String[] localDirsBase, FileSystem localFs) { - this.queryId = queryId; + this.queryIdentifier = queryIdentifier; this.appIdString = appIdString; this.dagName = dagName; this.dagIdentifier = dagIdentifier; @@ -67,18 +67,14 @@ public QueryInfo(String queryId, String appIdString, String dagName, int dagIden this.localFs = localFs; } - public String getQueryId() { - return queryId; + public QueryIdentifier getQueryIdentifier() { + return queryIdentifier; } public String getAppIdString() { return appIdString; } - public String getDagName() { - return dagName; - } - public int getDagIdentifier() { return dagIdentifier; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 6deaefc..0676edd 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -56,8 +56,7 @@ private final ScheduledExecutorService executorService; - // TODO Make use if the query id for cachin when this is available. - private final ConcurrentHashMap queryInfoMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap queryInfoMap = new ConcurrentHashMap<>(); private final String[] localDirsBase; private final FileSystem localFs; @@ -70,22 +69,25 @@ // Alternately - send in an explicit dag start message before any other message is processed. // Multiple threads communicating from a single AM gets in the way of this. - // Keeps track of completed dags. Assumes dag names are unique across AMs. - private final Set completedDagMap = Collections.newSetFromMap( - new ConcurrentHashMap()); + // Keeps track of completed DAGS. QueryIdentifiers need to be unique across applications. + private final Set completedDagMap = + Collections.newSetFromMap(new ConcurrentHashMap()); private final Lock lock = new ReentrantLock(); - private final ConcurrentMap dagSpecificLocks = new ConcurrentHashMap<>(); + private final ConcurrentMap dagSpecificLocks = new ConcurrentHashMap<>(); // Tracks various maps for dagCompletions. This is setup here since stateChange messages // may be processed by a thread which ends up executing before a task. - private final ConcurrentMap> - sourceCompletionMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> + sourceCompletionMap = new ConcurrentHashMap<>(); - // Tracks queryId by dagName. This can only be set when config is parsed in TezProcessor, + // Tracks HiveQueryId by QueryIdentifier. This can only be set when config is parsed in TezProcessor. // all the other existing code passes queryId equal to 0 everywhere. - private final ConcurrentHashMap dagNameToQueryId = new ConcurrentHashMap<>(); + // If we switch the runtime and move to parsing the payload in the AM - the actual hive queryId could + // be sent over the wire from the AM, and will take the place of AppId+dagId in QueryIdentifier. + private final ConcurrentHashMap queryIdentifierToHiveQueryId = + new ConcurrentHashMap<>(); public QueryTracker(Configuration conf, String[] localDirsBase) { super("QueryTracker"); @@ -107,7 +109,7 @@ public QueryTracker(Configuration conf, String[] localDirsBase) { /** * Register a new fragment for a specific query - * @param queryId + * @param queryIdentifier * @param appIdString * @param dagName * @param dagIdentifier @@ -117,23 +119,23 @@ public QueryTracker(Configuration conf, String[] localDirsBase) { * @param user * @throws IOException */ - QueryFragmentInfo registerFragment(String queryId, String appIdString, String dagName, + QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user, FragmentSpecProto fragmentSpec) throws IOException { - ReadWriteLock dagLock = getDagLock(dagName); + ReadWriteLock dagLock = getDagLock(queryIdentifier); dagLock.readLock().lock(); try { - if (!completedDagMap.contains(dagName)) { - QueryInfo queryInfo = queryInfoMap.get(dagName); + if (!completedDagMap.contains(queryIdentifier)) { + QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); if (queryInfo == null) { - queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user, - getSourceCompletionMap(dagName), localDirsBase, localFs); - queryInfoMap.putIfAbsent(dagName, queryInfo); + queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user, + getSourceCompletionMap(queryIdentifier), localDirsBase, localFs); + queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); } return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec); } else { // Cleanup the dag lock here, since it may have been created after the query completed - dagSpecificLocks.remove(dagName); + dagSpecificLocks.remove(queryIdentifier); throw new RuntimeException( "Dag " + dagName + " already complete. Rejecting fragment [" + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]"); @@ -148,12 +150,12 @@ QueryFragmentInfo registerFragment(String queryId, String appIdString, String da * @param fragmentInfo */ void fragmentComplete(QueryFragmentInfo fragmentInfo) { - String dagName = fragmentInfo.getQueryInfo().getDagName(); - QueryInfo queryInfo = queryInfoMap.get(dagName); + QueryIdentifier qId = fragmentInfo.getQueryInfo().getQueryIdentifier(); + QueryInfo queryInfo = queryInfoMap.get(qId); if (queryInfo == null) { // Possible because a queryComplete message from the AM can come in first - KILL / SUCCESSFUL, // before the fragmentComplete is reported - LOG.info("Ignoring fragmentComplete message for unknown query"); + LOG.info("Ignoring fragmentComplete message for unknown query: {}", qId); } else { queryInfo.unregisterFragment(fragmentInfo); } @@ -161,42 +163,40 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { /** * Register completion for a query - * @param queryId - * @param dagName + * @param queryIdentifier * @param deleteDelay */ - List queryComplete(String queryId, String dagName, long deleteDelay) { + List queryComplete(QueryIdentifier queryIdentifier, long deleteDelay) { if (deleteDelay == -1) { deleteDelay = defaultDeleteDelaySeconds; } - ReadWriteLock dagLock = getDagLock(dagName); + ReadWriteLock dagLock = getDagLock(queryIdentifier); dagLock.writeLock().lock(); try { - rememberCompletedDag(dagName); - LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds", - dagName, deleteDelay); - QueryInfo queryInfo = queryInfoMap.remove(dagName); + rememberCompletedDag(queryIdentifier); + LOG.info("Processing queryComplete for queryIdentifier={} with deleteDelay={} seconds", queryIdentifier, + deleteDelay); + QueryInfo queryInfo = queryInfoMap.remove(queryIdentifier); if (queryInfo == null) { - LOG.warn("Ignoring query complete for unknown dag: {}", dagName); + LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier); return Collections.emptyList(); } String[] localDirs = queryInfo.getLocalDirsNoCreate(); if (localDirs != null) { for (String localDir : localDirs) { cleanupDir(localDir, deleteDelay); - ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.getDagIdentifier()); + ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier()); } } // Clearing this before sending a kill is OK, since canFinish will change to false. // Ideally this should be a state machine where kills are issued to the executor, // and the structures are cleaned up once all tasks complete. New requests, however, // should not be allowed after a query complete is received. - sourceCompletionMap.remove(dagName); - String savedQueryId = dagNameToQueryId.remove(dagName); - queryId = queryId == null ? savedQueryId : queryId; - dagSpecificLocks.remove(dagName); - if (queryId != null) { - ObjectCacheFactory.removeLlapQueryCache(queryId); + sourceCompletionMap.remove(queryIdentifier); + String savedQueryId = queryIdentifierToHiveQueryId.remove(queryIdentifier); + dagSpecificLocks.remove(queryIdentifier); + if (savedQueryId != null) { + ObjectCacheFactory.removeLlapQueryCache(savedQueryId); } return queryInfo.getRegisteredFragments(); } finally { @@ -206,24 +206,24 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { - public void rememberCompletedDag(String dagName) { - if (completedDagMap.add(dagName)) { + public void rememberCompletedDag(QueryIdentifier queryIdentifier) { + if (completedDagMap.add(queryIdentifier)) { // We will remember completed DAG for an hour to avoid execution out-of-order fragments. - executorService.schedule(new DagMapCleanerCallable(dagName), 1, TimeUnit.HOURS); + executorService.schedule(new DagMapCleanerCallable(queryIdentifier), 1, TimeUnit.HOURS); } else { - LOG.warn("Couldn't add {} to completed dag set", dagName); + LOG.warn("Couldn't add {} to completed dag set", queryIdentifier); } } /** * Register an update to a source within an executing dag - * @param dagName + * @param queryIdentifier * @param sourceName * @param sourceState */ - void registerSourceStateChange(String dagName, String sourceName, SourceStateProto sourceState) { - getSourceCompletionMap(dagName).put(sourceName, sourceState); - QueryInfo queryInfo = queryInfoMap.get(dagName); + void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceName, SourceStateProto sourceState) { + getSourceCompletionMap(queryIdentifier).put(sourceName, sourceState); + QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); if (queryInfo != null) { queryInfo.sourceStateUpdated(sourceName); } else { @@ -233,13 +233,13 @@ void registerSourceStateChange(String dagName, String sourceName, SourceStatePro } - private ReadWriteLock getDagLock(String dagName) { + private ReadWriteLock getDagLock(QueryIdentifier queryIdentifier) { lock.lock(); try { - ReadWriteLock dagLock = dagSpecificLocks.get(dagName); + ReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier); if (dagLock == null) { dagLock = new ReentrantReadWriteLock(); - dagSpecificLocks.put(dagName, dagLock); + dagSpecificLocks.put(queryIdentifier, dagLock); } return dagLock; } finally { @@ -247,20 +247,20 @@ private ReadWriteLock getDagLock(String dagName) { } } - private ConcurrentMap getSourceCompletionMap(String dagName) { - ConcurrentMap dagMap = sourceCompletionMap.get(dagName); + private ConcurrentMap getSourceCompletionMap(QueryIdentifier queryIdentifier) { + ConcurrentMap dagMap = sourceCompletionMap.get(queryIdentifier); if (dagMap == null) { dagMap = new ConcurrentHashMap<>(); ConcurrentMap old = - sourceCompletionMap.putIfAbsent(dagName, dagMap); + sourceCompletionMap.putIfAbsent(queryIdentifier, dagMap); dagMap = (old != null) ? old : dagMap; } return dagMap; } - public void registerDagQueryId(String dagName, String queryId) { - if (queryId == null) return; - dagNameToQueryId.putIfAbsent(dagName, queryId); + public void registerDagQueryId(QueryIdentifier queryIdentifier, String hiveQueryIdString) { + if (hiveQueryIdString == null) return; + queryIdentifierToHiveQueryId.putIfAbsent(queryIdentifier, hiveQueryIdString); } @Override @@ -302,15 +302,15 @@ protected Void callInternal() { } private class DagMapCleanerCallable extends CallableWithNdc { - private final String dagName; + private final QueryIdentifier queryIdentifier; - private DagMapCleanerCallable(String dagName) { - this.dagName = dagName; + private DagMapCleanerCallable(QueryIdentifier queryIdentifier) { + this.queryIdentifier = queryIdentifier; } @Override protected Void callInternal() { - completedDagMap.remove(dagName); + completedDagMap.remove(queryIdentifier); return null; } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index f03a2ff..ede2a03 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -51,6 +51,7 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; @@ -100,6 +101,7 @@ private final LlapDaemonExecutorMetrics metrics; private final String requestId; private final String queryId; + private final HadoopShim tezHadoopShim; private boolean shouldRunTask = true; final Stopwatch runtimeWatch = new Stopwatch(); final Stopwatch killtimerWatch = new Stopwatch(); @@ -115,7 +117,8 @@ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo frag long memoryAvailable, AMReporter amReporter, ConfParams confParams, LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler, - FragmentCompletionHandler fragmentCompleteHandler) { + FragmentCompletionHandler fragmentCompleteHandler, + HadoopShim tezHadoopShim) { this.request = request; this.fragmentInfo = fragmentInfo; this.conf = conf; @@ -131,7 +134,7 @@ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo frag // Register with the AMReporter when the callable is setup. Unregister once it starts running. if (jobToken != null) { this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), - request.getUser(), jobToken, null, request.getFragmentSpec().getDagName()); + request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier()); } this.metrics = metrics; this.requestId = request.getFragmentSpec().getFragmentIdentifierString(); @@ -139,6 +142,7 @@ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo frag this.queryId = request.getFragmentSpec().getDagName(); this.killedTaskHandler = killedTaskHandler; this.fragmentCompletionHanler = fragmentCompleteHandler; + this.tezHadoopShim = tezHadoopShim; } public long getStartTime() { @@ -216,7 +220,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, - executionContext, memoryAvailable, false); + executionContext, memoryAvailable, false, tezHadoopShim); } } if (taskRunner == null) { @@ -297,9 +301,8 @@ public void killTask() { */ public void reportTaskKilled() { killedTaskHandler - .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, null, - taskSpec.getDAGName(), - taskSpec.getTaskAttemptID()); + .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, + fragmentInfo.getQueryInfo().getQueryIdentifier(), taskSpec.getTaskAttemptID()); } public boolean canFinish() { @@ -428,6 +431,7 @@ public void onSuccess(TaskRunner2Result result) { HistoryLogger .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), executionContext.getHostName(), request.getFragmentSpec().getDagName(), + fragmentInfo.getQueryInfo().getDagIdentifier(), request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, @@ -445,6 +449,7 @@ public void onFailure(Throwable t) { HistoryLogger .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), executionContext.getHostName(), request.getFragmentSpec().getDagName(), + fragmentInfo.getQueryInfo().getDagIdentifier(), request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java index 7428a6a..f61d62f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java @@ -262,5 +262,4 @@ public static SourceStateProto fromVertexState(VertexState state) { throw new RuntimeException("Unexpected state: " + state); } } - } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 5c370ee..eb6384f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; @@ -62,7 +63,6 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; @@ -74,6 +74,7 @@ import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +86,8 @@ private static final boolean isDebugEnabed = LOG.isDebugEnabled(); private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST; - private final ConcurrentMap credentialMap; + + private final ConcurrentMap credentialMap; // Tracks containerIds and taskAttemptIds, so can be kept independent of the running DAG. // When DAG specific cleanup happens, it'll be better to link this to a DAG though. @@ -104,7 +106,8 @@ private final ConcurrentMap pingedNodeMap = new ConcurrentHashMap<>(); - private volatile String currentDagName; + private volatile int currentDagId; + private volatile QueryIdentifierProto currentQueryIdentifierProto; public LlapTaskCommunicator( TaskCommunicatorContext taskCommunicatorContext) { @@ -226,8 +229,9 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task int priority) { super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority); - if (taskSpec.getDAGName() != currentDagName) { - resetCurrentDag(taskSpec.getDAGName()); + int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId(); + if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIdentifier())) { + resetCurrentDag(dagId); } @@ -251,7 +255,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task nodesForQuery.add(nodeId); sourceStateTracker.registerTaskForStateUpdates(host, port, taskSpec.getInputs()); - FragmentRuntimeInfo fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo(taskSpec.getDAGName(), + FragmentRuntimeInfo fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo( taskSpec.getVertexName(), taskSpec.getTaskAttemptID().getTaskID().getId(), priority); SubmitWorkRequestProto requestProto; @@ -349,7 +353,7 @@ private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId, // NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself if (nodeId != null) { TerminateFragmentRequestProto request = - TerminateFragmentRequestProto.newBuilder().setDagName(currentDagName) + TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifierProto) .setFragmentIdentifierString(taskAttemptId.toString()).build(); communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() { @@ -370,12 +374,16 @@ public void indicateError(Throwable t) { } } + + + @Override - public void dagComplete(final String dagName) { - QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder().setDagName( - dagName).setDeleteDelay(deleteDelayOnDagComplete).build(); + public void dagComplete(final int dagIdentifier) { + QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder() + .setQueryIdentifier(constructQueryIdentifierProto(dagIdentifier)) + .setDeleteDelay(deleteDelayOnDagComplete).build(); for (final LlapNodeId llapNodeId : nodesForQuery) { - LOG.info("Sending dagComplete message for {}, to {}", dagName, llapNodeId); + LOG.info("Sending dagComplete message for {}, to {}", dagIdentifier, llapNodeId); communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(), new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() { @Override @@ -384,7 +392,7 @@ public void setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto resp @Override public void indicateError(Throwable t) { - LOG.warn("Failed to indicate dag complete dagId={} to node {}", dagName, llapNodeId); + LOG.warn("Failed to indicate dag complete dagId={} to node {}", dagIdentifier, llapNodeId); } }); } @@ -495,12 +503,12 @@ void nodePinged(String hostname, int port) { } } - private void resetCurrentDag(String newDagName) { + private void resetCurrentDag(int newDagId) { // Working on the assumption that a single DAG runs at a time per AM. - currentDagName = newDagName; - sourceStateTracker.resetState(newDagName); + currentQueryIdentifierProto = constructQueryIdentifierProto(newDagId); + sourceStateTracker.resetState(newDagId); nodesForQuery.clear(); - LOG.info("CurrentDag set to: " + newDagName); + LOG.info("CurrentDagId set to: " + newDagId + ", name=" + getContext().getCurrentDagName()); // TODO Is it possible for heartbeats to come in from lost tasks - those should be told to die, which // is likely already happening. } @@ -518,10 +526,12 @@ private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerI // Credentials can change across DAGs. Ideally construct only once per DAG. taskCredentials.addAll(getContext().getCredentials()); - ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName()); + Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() == + taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); + ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); if (credentialsBinary == null) { credentialsBinary = serializeCredentials(getContext().getCredentials()); - credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate()); + credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); } else { credentialsBinary = credentialsBinary.duplicate(); } @@ -736,4 +746,10 @@ void unregisterContainer(ContainerId containerId) { } } + + private QueryIdentifierProto constructQueryIdentifierProto(int dagIdentifier) { + return QueryIdentifierProto.newBuilder() + .setAppIdentifier(getContext().getCurrentAppIdentifier()).setDagIdentifier(dagIdentifier) + .build(); + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java index 066fae5..fded9bf 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java @@ -24,6 +24,8 @@ import java.util.Set; import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.llap.LlapNodeId; @@ -31,7 +33,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.tezplugins.Converters; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; -import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; @@ -45,28 +46,33 @@ private final TaskCommunicatorContext taskCommunicatorContext; private final LlapTaskCommunicator taskCommunicator; + private final QueryIdentifierProto BASE_QUERY_IDENTIFIER; + // Tracks vertices for which notifications have been registered private final Set notificationRegisteredVertices = new HashSet<>(); private final Map sourceInfoMap = new HashMap<>(); private final Map nodeInfoMap = new HashMap<>(); - private volatile String currentDagName; + private volatile QueryIdentifierProto currentQueryIdentifier; public SourceStateTracker(TaskCommunicatorContext taskCommunicatorContext, LlapTaskCommunicator taskCommunicator) { this.taskCommunicatorContext = taskCommunicatorContext; this.taskCommunicator = taskCommunicator; + BASE_QUERY_IDENTIFIER = QueryIdentifierProto.newBuilder() + .setAppIdentifier(taskCommunicatorContext.getCurrentAppIdentifier()).build(); } /** * To be invoked after each DAG completes. */ - public synchronized void resetState(String newDagName) { + public synchronized void resetState(int newDagId) { sourceInfoMap.clear(); nodeInfoMap.clear(); notificationRegisteredVertices.clear(); - this.currentDagName = newDagName; + this.currentQueryIdentifier = + QueryIdentifierProto.newBuilder(BASE_QUERY_IDENTIFIER).setDagIdentifier(newDagId).build(); } /** @@ -139,16 +145,16 @@ public synchronized void sourceStateUpdated(String sourceName, VertexState sourc } + // Assumes serialized DAGs within an AM, and a reset of structures after each DAG completes. /** * Constructs FragmentRuntimeInfo for scheduling within LLAP daemons. * Also caches state based on state updates. - * @param dagName * @param vertexName * @param fragmentNumber * @param priority * @return */ - public synchronized FragmentRuntimeInfo getFragmentRuntimeInfo(String dagName, String vertexName, int fragmentNumber, + public synchronized FragmentRuntimeInfo getFragmentRuntimeInfo(String vertexName, int fragmentNumber, int priority) { FragmentRuntimeInfo.Builder builder = FragmentRuntimeInfo.newBuilder(); maybeRegisterForVertexUpdates(vertexName); @@ -282,9 +288,8 @@ private boolean isSourceOfInterest(InputSpec inputSpec) { void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) { taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(), - SourceStateUpdatedRequestProto.newBuilder().setDagName(currentDagName).setSrcName( - sourceName) - .setState(Converters.fromVertexState(state)).build()); + SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifier) + .setSrcName(sourceName).setState(Converters.fromVertexState(state)).build()); } diff --git llap-server/src/protobuf/LlapDaemonProtocol.proto llap-server/src/protobuf/LlapDaemonProtocol.proto index a2d944f..944c96c 100644 --- llap-server/src/protobuf/LlapDaemonProtocol.proto +++ llap-server/src/protobuf/LlapDaemonProtocol.proto @@ -50,6 +50,7 @@ message GroupInputSpecProto { message FragmentSpecProto { optional string fragment_identifier_string = 1; optional string dag_name = 2; + optional int32 dag_id = 11; optional string vertex_name = 3; optional EntityDescriptorProto processor_descriptor = 4; repeated IOSpecProto input_specs = 5; @@ -74,6 +75,11 @@ enum SourceStateProto { S_RUNNING = 2; } +message QueryIdentifierProto { + optional string app_identifier = 1; + optional int32 dag_identifier = 2; +} + message SubmitWorkRequestProto { optional string container_id_string = 1; optional string am_host = 2; @@ -98,7 +104,7 @@ message SubmitWorkResponseProto { } message SourceStateUpdatedRequestProto { - optional string dag_name = 1; + optional QueryIdentifierProto query_identifier = 1; optional string src_name = 2; optional SourceStateProto state = 3; } @@ -108,17 +114,16 @@ message SourceStateUpdatedResponseProto { message QueryCompleteRequestProto { optional string query_id = 1; - optional string dag_name = 2; - optional int64 delete_delay = 3 [default = 0]; + optional QueryIdentifierProto query_identifier = 2; + optional int64 delete_delay = 4 [default = 0]; } message QueryCompleteResponseProto { } message TerminateFragmentRequestProto { - optional string query_id = 1; - optional string dag_name = 2; - optional string fragment_identifier_string = 7; + optional QueryIdentifierProto query_identifier = 1; + optional string fragment_identifier_string = 2; } message TerminateFragmentResponseProto { diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 38af07e..d3ba6dd 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -33,6 +33,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; @@ -125,7 +126,7 @@ public MockRequest(SubmitWorkRequestProto requestProto, new ExecutionContextImpl("localhost"), null, new Credentials(), 0, null, null, mock( LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock( - FragmentCompletionHandler.class)); + FragmentCompletionHandler.class), new DefaultHadoopShim()); this.workTime = workTime; this.canFinish = canFinish; } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java new file mode 100644 index 0000000..39a3865 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java @@ -0,0 +1,48 @@ +package org.apache.hadoop.hive.llap.daemon.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import org.junit.Test; + +public class TestQueryIdentifier { + + @Test (timeout = 5000) + public void testEquality() { + + String appIdString1 = "app1"; + String appIdString2 = "app2"; + + int dagId1 = 1; + int dagId2 = 2; + + QueryIdentifier[] queryIdentifiers = new QueryIdentifier[4]; + + queryIdentifiers[0] = new QueryIdentifier(appIdString1, dagId1); + queryIdentifiers[1] = new QueryIdentifier(appIdString1, dagId2); + queryIdentifiers[2] = new QueryIdentifier(appIdString2, dagId1); + queryIdentifiers[3] = new QueryIdentifier(appIdString2, dagId2); + + for (int i = 0 ; i < 4 ; i++) { + for (int j = 0 ; j < 4 ; j++) { + if (i == j) { + assertEquals(queryIdentifiers[i], queryIdentifiers[j]); + } else { + assertNotEquals(queryIdentifiers[i], queryIdentifiers[j]); + } + } + } + + QueryIdentifier q11 = new QueryIdentifier(appIdString1, dagId1); + QueryIdentifier q12 = new QueryIdentifier(appIdString1, dagId2); + QueryIdentifier q21 = new QueryIdentifier(appIdString2, dagId1); + QueryIdentifier q22 = new QueryIdentifier(appIdString2, dagId2); + + assertEquals(queryIdentifiers[0], q11); + assertEquals(queryIdentifiers[1], q12); + assertEquals(queryIdentifiers[2], q21); + assertEquals(queryIdentifiers[3], q22); + + + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index ebfb430..73df985 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -39,6 +39,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; @@ -58,7 +59,7 @@ public MockRequest(SubmitWorkRequestProto requestProto, super(requestProto, mock(QueryFragmentInfo.class), conf, new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null, mock(KilledTaskHandler.class), mock( - FragmentCompletionHandler.class)); + FragmentCompletionHandler.class), new DefaultHadoopShim()); this.workTime = workTime; this.canFinish = canFinish; } diff --git pom.xml pom.xml index 848432c..2632fcb 100644 --- pom.xml +++ pom.xml @@ -163,7 +163,7 @@ 1.0.1 1.7.5 4.0.4 - 0.8.1-alpha + 0.8.2 2.2.0 1.5.0 2.10 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index e9c14b1..45d3cd1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -33,6 +33,7 @@ import java.util.TreeSet; import com.google.common.collect.LinkedListMultimap; +import org.apache.hadoop.mapred.split.SplitLocationProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -271,25 +272,27 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr HashMultimap. create(); boolean secondLevelGroupingDone = false; if ((mainWorkName.isEmpty()) || (inputName.compareTo(mainWorkName) == 0)) { + SplitLocationProvider splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG); for (Integer key : bucketToInitialSplitMap.keySet()) { InputSplit[] inputSplitArray = (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); Multimap groupedSplit = grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, - availableSlots, inputName, mainWorkName.isEmpty()); + availableSlots, inputName, mainWorkName.isEmpty(), splitLocationProvider); if (mainWorkName.isEmpty() == false) { Multimap singleBucketToGroupedSplit = HashMultimap. create(); singleBucketToGroupedSplit.putAll(key, groupedSplit.values()); groupedSplit = grouper.group(jobConf, singleBucketToGroupedSplit, availableSlots, - HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES)); + HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES), null); secondLevelGroupingDone = true; } bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); } processAllEvents(inputName, bucketToGroupedSplitMap, secondLevelGroupingDone); } else { + SplitLocationProvider splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG); // do not group across files in case of side work because there is only 1 KV reader per // grouped split. This would affect SMB joins where we want to find the smallest key in // all the bucket files. @@ -298,7 +301,7 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); Multimap groupedSplit = grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, - availableSlots, inputName, false); + availableSlots, inputName, false, splitLocationProvider); bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); } /* diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 8ebfe69..8e48c2e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.split.SplitLocationProvider; import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.common.TezUtils; @@ -79,6 +80,7 @@ private final MRInputUserPayloadProto userPayloadProto; private final MapWork work; private final SplitGrouper splitGrouper = new SplitGrouper(); + private final SplitLocationProvider splitLocationProvider; public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException, SerDeException { @@ -91,6 +93,9 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE this.jobConf = new JobConf(conf); + this.splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG); + LOG.info("SplitLocationProvider: " + splitLocationProvider); + // Read all credentials into the credentials instance stored in JobConf. ShimLoader.getHadoopShims().getMergedCredentials(jobConf); @@ -149,6 +154,7 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES, TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); + // Raw splits InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); // Sort the splits, so that subsequent grouping is consistent. Arrays.sort(splits, new InputSplitComparator()); @@ -160,10 +166,10 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE } Multimap groupedSplits = - splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots); + splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, splitLocationProvider); // And finally return them in a flat array InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]); - LOG.info("Number of grouped splits: " + flatSplits.length); + LOG.info("Number of split groups: " + flatSplits.length); List locationHints = splitGrouper.createTaskLocationHints(flatSplits, generateConsistentSplits); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java new file mode 100644 index 0000000..c06499e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.split.SplitLocationProvider; +import org.apache.hive.common.util.Murmur3; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This maps a split (path + offset) to an index based on the number of locations provided. + * + * If locations do not change across jobs, the intention is to map the same split to the same node. + * + * A big problem is when nodes change (added, removed, temporarily removed and re-added) etc. That changes + * the number of locations / position of locations - and will cause the cache to be almost completely invalidated. + * + * TODO: Support for consistent hashing when combining the split location generator and the ServiceRegistry. + * + */ +public class HostAffinitySplitLocationProvider implements SplitLocationProvider { + + private final Logger LOG = LoggerFactory.getLogger(HostAffinitySplitLocationProvider.class); + private final boolean isDebugEnabled = LOG.isDebugEnabled(); + + private final String[] knownLocations; + + public HostAffinitySplitLocationProvider(String[] knownLocations) { + Preconditions.checkState(knownLocations != null && knownLocations.length != 0, + HostAffinitySplitLocationProvider.class.getName() + + "needs at least 1 location to function"); + this.knownLocations = knownLocations; + } + + @Override + public String[] getLocations(InputSplit split) throws IOException { + if (split instanceof FileSplit) { + FileSplit fsplit = (FileSplit) split; + long hash = generateHash(fsplit.getPath().toString(), fsplit.getStart()); + int indexRaw = (int) (hash % knownLocations.length); + int index = Math.abs(indexRaw); + if (isDebugEnabled) { + LOG.debug( + "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" + + fsplit.getLength() + " mapped to index=" + index + ", location=" + + knownLocations[index]); + } + return new String[]{knownLocations[index]}; + } else { + if (isDebugEnabled) { + LOG.debug("Split: " + split + " is not a FileSplit. Using default locations"); + } + return split.getLocations(); + } + } + + private long generateHash(String path, long startOffset) throws IOException { + // Explicitly using only the start offset of a split, and not the length. + // Splits generated on block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node. + // There is the drawback of potentially hashing the same data on multiple nodes though, when a large split + // is sent to 1 node, and a second invocation uses smaller chunks of the previous large split and send them + // to different nodes. + DataOutputBuffer dob = new DataOutputBuffer(); + dob.writeLong(startOffset); + dob.writeUTF(path); + return Murmur3.hash64(dob.getData(), 0, dob.getLength()); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java index aaaa6a5..f4496df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -42,6 +41,7 @@ import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.split.SplitLocationProvider; import org.apache.hadoop.mapred.split.TezGroupedSplit; import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper; import org.apache.tez.dag.api.TaskLocationHint; @@ -65,14 +65,13 @@ private final TezMapredSplitsGrouper tezGrouper = new TezMapredSplitsGrouper(); - - /** * group splits for each bucket separately - while evenly filling all the * available slots with tasks */ public Multimap group(Configuration conf, - Multimap bucketSplitMultimap, int availableSlots, float waves) + Multimap bucketSplitMultimap, int availableSlots, float waves, + SplitLocationProvider splitLocationProvider) throws IOException { // figure out how many tasks we want for each bucket @@ -90,9 +89,9 @@ InputSplit[] rawSplits = inputSplitCollection.toArray(new InputSplit[0]); InputSplit[] groupedSplits = tezGrouper.getGroupedSplits(conf, rawSplits, bucketTaskMap.get(bucketId), - HiveInputFormat.class.getName(), new ColumnarSplitSizeEstimator()); + HiveInputFormat.class.getName(), new ColumnarSplitSizeEstimator(), splitLocationProvider); - LOG.info("Original split size is " + rawSplits.length + " grouped split size is " + LOG.info("Original split count is " + rawSplits.length + " grouped split count is " + groupedSplits.length + ", for bucket: " + bucketId); for (InputSplit inSplit : groupedSplits) { @@ -155,9 +154,10 @@ public Multimap generateGroupedSplits(JobConf jobConf, Configuration conf, InputSplit[] splits, - float waves, int availableSlots) + float waves, int availableSlots, + SplitLocationProvider locationProvider) throws Exception { - return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true); + return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true, locationProvider); } /** Generate groups of splits, separated by schema evolution boundaries */ @@ -166,10 +166,12 @@ InputSplit[] splits, float waves, int availableSlots, String inputName, - boolean groupAcrossFiles) throws + boolean groupAcrossFiles, + SplitLocationProvider locationProvider) throws Exception { MapWork work = populateMapWork(jobConf, inputName); + // ArrayListMultimap is important here to retain the ordering for the splits. Multimap bucketSplitMultiMap = ArrayListMultimap. create(); @@ -188,7 +190,7 @@ // group them into the chunks we want Multimap groupedSplits = - this.group(jobConf, bucketSplitMultiMap, availableSlots, waves); + this.group(jobConf, bucketSplitMultiMap, availableSlots, waves, locationProvider); return groupedSplits; } @@ -207,6 +209,8 @@ // mapping of bucket id to number of required tasks to run Map bucketTaskMap = new HashMap(); + // TODO HIVE-12255. Make use of SplitSizeEstimator. + // The actual task computation needs to be looked at as well. // compute the total size per bucket long totalSize = 0; boolean earlyExit = false; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java new file mode 100644 index 0000000..3eb858b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.mapred.split.SplitLocationProvider; +import org.slf4j.Logger; + +public class Utils { + public static SplitLocationProvider getSplitLocationProvider(Configuration conf, Logger LOG) throws + IOException { + boolean useCustomLocations = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS); + SplitLocationProvider splitLocationProvider; + LOG.info("SplitGenerator using llap affinitized locations: " + useCustomLocations); + if (useCustomLocations) { + LlapRegistryService serviceRegistry; + serviceRegistry = LlapRegistryService.getClient(conf); + + List serviceInstances = + serviceRegistry.getInstances().getAllInstancesOrdered(); + String[] locations = new String[serviceInstances.size()]; + int i = 0; + for (ServiceInstance serviceInstance : serviceInstances) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" + + serviceInstance.getHost() + " to list for split locations"); + } + locations[i++] = serviceInstance.getHost(); + } + splitLocationProvider = new HostAffinitySplitLocationProvider(locations); + } else { + splitLocationProvider = null; + } + return splitLocationProvider; + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java new file mode 100644 index 0000000..d98a5ff --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java @@ -0,0 +1,163 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.junit.Test; + +public class TestHostAffinitySplitLocationProvider { + + + private static final String[] locations = new String[5]; + private static final Set locationsSet = new HashSet<>(); + private static final String[] executorLocations = new String[9]; + private static final Set executorLocationsSet = new HashSet<>(); + + static { + for (int i = 0 ; i < 5 ; i++) { + locations[i] = "location" + i; + locationsSet.add(locations[i]); + } + + for (int i = 0 ; i < 9 ; i++) { + executorLocations[i] = "execLocation" + i; + executorLocationsSet.add(executorLocations[i]); + } + + } + + @Test (timeout = 5000) + public void testNonFileSplits() throws IOException { + + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); + + InputSplit inputSplit1 = createMockInputSplit(new String[] {locations[0], locations[1]}); + InputSplit inputSplit2 = createMockInputSplit(new String[] {locations[2], locations[3]}); + + assertArrayEquals(new String[] {locations[0], locations[1]}, locationProvider.getLocations(inputSplit1)); + assertArrayEquals(new String[] {locations[2], locations[3]}, locationProvider.getLocations(inputSplit2)); + } + + @Test (timeout = 5000) + public void testOrcSplitsBasic() throws IOException { + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); + + InputSplit os1 = createMockFileSplit(true, "path1", 0, 1000, new String[] {locations[0], locations[1]}); + InputSplit os2 = createMockFileSplit(true, "path2", 0, 2000, new String[] {locations[2], locations[3]}); + InputSplit os3 = createMockFileSplit(true, "path3", 1000, 2000, new String[] {locations[0], locations[3]}); + + String[] retLoc1 = locationProvider.getLocations(os1); + String[] retLoc2 = locationProvider.getLocations(os2); + String[] retLoc3 = locationProvider.getLocations(os3); + + assertEquals(1, retLoc1.length); + assertFalse(locationsSet.contains(retLoc1[0])); + assertTrue(executorLocationsSet.contains(retLoc1[0])); + + assertEquals(1, retLoc2.length); + assertFalse(locationsSet.contains(retLoc2[0])); + assertTrue(executorLocationsSet.contains(retLoc2[0])); + + assertEquals(1, retLoc3.length); + assertFalse(locationsSet.contains(retLoc3[0])); + assertTrue(executorLocationsSet.contains(retLoc3[0])); + } + + @Test (timeout = 5000) + public void testOrcSplitsLocationAffinity() throws IOException { + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); + + // Same file, offset, different lengths + InputSplit os11 = createMockFileSplit(true, "path1", 0, 15000, new String[] {locations[0], locations[1]}); + InputSplit os12 = createMockFileSplit(true, "path1", 0, 30000, new String[] {locations[0], locations[1]}); + // Same file, different offset + InputSplit os13 = createMockFileSplit(true, "path1", 15000, 30000, new String[] {locations[0], locations[1]}); + + String[] retLoc11 = locationProvider.getLocations(os11); + String[] retLoc12 = locationProvider.getLocations(os12); + String[] retLoc13 = locationProvider.getLocations(os13); + + assertEquals(1, retLoc11.length); + assertFalse(locationsSet.contains(retLoc11[0])); + assertTrue(executorLocationsSet.contains(retLoc11[0])); + + assertEquals(1, retLoc12.length); + assertFalse(locationsSet.contains(retLoc12[0])); + assertTrue(executorLocationsSet.contains(retLoc12[0])); + + assertEquals(1, retLoc13.length); + assertFalse(locationsSet.contains(retLoc13[0])); + assertTrue(executorLocationsSet.contains(retLoc13[0])); + + // Verify the actual locations being correct. + // os13 should be on a different location. Splits are supposed to be consistent across JVMs, + // the test is setup to verify a different host (make sure not to hash to the same host as os11,os12). + // If the test were to fail because the host is the same - the assumption about consistent across JVM + // instances is likely incorrect. + assertEquals(retLoc11[0], retLoc12[0]); + assertNotEquals(retLoc11[0], retLoc13[0]); + + + // Get locations again, and make sure they're the same. + String[] retLoc112 = locationProvider.getLocations(os11); + String[] retLoc122 = locationProvider.getLocations(os12); + String[] retLoc132 = locationProvider.getLocations(os13); + assertArrayEquals(retLoc11, retLoc112); + assertArrayEquals(retLoc12, retLoc122); + assertArrayEquals(retLoc13, retLoc132); + } + + + private InputSplit createMockInputSplit(String[] locations) throws IOException { + InputSplit inputSplit = mock(InputSplit.class); + doReturn(locations).when(inputSplit).getLocations(); + return inputSplit; + } + + private InputSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start, + long length, String[] locations) throws IOException { + FileSplit fileSplit; + if (createOrcSplit) { + fileSplit = mock(OrcSplit.class); + } else { + fileSplit = mock(FileSplit.class); + } + + doReturn(start).when(fileSplit).getStart(); + doReturn(length).when(fileSplit).getLength(); + doReturn(new Path(fakePathString)).when(fileSplit).getPath(); + doReturn(locations).when(fileSplit).getLocations(); + + doReturn(locations).when(fileSplit).getLocations(); + return fileSplit; + } + + +}