diff --git llap-server/pom.xml llap-server/pom.xml
index 22ed693..9325bd9 100644
--- llap-server/pom.xml
+++ llap-server/pom.xml
@@ -184,6 +184,12 @@
org.apache.tez
+ tez-mapreduce
+ ${tez.version}
+ true
+
+
+ org.apache.tez
tez-dag
${tez.version}
true
diff --git llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index 2caca26..3ca2640 100644
--- llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -8,6 +8,88 @@ private LlapDaemonProtocolProtos() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
+ /**
+ * Protobuf enum {@code SourceStateProto}
+ */
+ public enum SourceStateProto
+ implements com.google.protobuf.ProtocolMessageEnum {
+ /**
+ * S_SUCCEEDED = 1;
+ */
+ S_SUCCEEDED(0, 1),
+ /**
+ * S_RUNNING = 2;
+ */
+ S_RUNNING(1, 2),
+ ;
+
+ /**
+ * S_SUCCEEDED = 1;
+ */
+ public static final int S_SUCCEEDED_VALUE = 1;
+ /**
+ * S_RUNNING = 2;
+ */
+ public static final int S_RUNNING_VALUE = 2;
+
+
+ public final int getNumber() { return value; }
+
+ public static SourceStateProto valueOf(int value) {
+ switch (value) {
+ case 1: return S_SUCCEEDED;
+ case 2: return S_RUNNING;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap() {
+ public SourceStateProto findValueByNumber(int number) {
+ return SourceStateProto.valueOf(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(index);
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor().getEnumTypes().get(0);
+ }
+
+ private static final SourceStateProto[] VALUES = values();
+
+ public static SourceStateProto valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int index;
+ private final int value;
+
+ private SourceStateProto(int index, int value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:SourceStateProto)
+ }
+
public interface UserPayloadProtoOrBuilder
extends com.google.protobuf.MessageOrBuilder {
@@ -7434,134 +7516,1280 @@ public Builder mergeFrom(
// @@protoc_insertion_point(class_scope:SubmitWorkResponseProto)
}
- /**
- * Protobuf service {@code LlapDaemonProtocol}
- */
- public static abstract class LlapDaemonProtocol
- implements com.google.protobuf.Service {
- protected LlapDaemonProtocol() {}
+ public interface SourceStateUpdatedRequestProtoOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
- public interface Interface {
- /**
- * rpc submitWork(.SubmitWorkRequestProto) returns (.SubmitWorkResponseProto);
- */
- public abstract void submitWork(
- com.google.protobuf.RpcController controller,
- org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request,
- com.google.protobuf.RpcCallback done);
+ // optional string dag_name = 1;
+ /**
+ * optional string dag_name = 1;
+ */
+ boolean hasDagName();
+ /**
+ * optional string dag_name = 1;
+ */
+ java.lang.String getDagName();
+ /**
+ * optional string dag_name = 1;
+ */
+ com.google.protobuf.ByteString
+ getDagNameBytes();
- }
+ // optional string src_name = 2;
+ /**
+ * optional string src_name = 2;
+ */
+ boolean hasSrcName();
+ /**
+ * optional string src_name = 2;
+ */
+ java.lang.String getSrcName();
+ /**
+ * optional string src_name = 2;
+ */
+ com.google.protobuf.ByteString
+ getSrcNameBytes();
- public static com.google.protobuf.Service newReflectiveService(
- final Interface impl) {
- return new LlapDaemonProtocol() {
- @java.lang.Override
- public void submitWork(
- com.google.protobuf.RpcController controller,
- org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request,
- com.google.protobuf.RpcCallback done) {
- impl.submitWork(controller, request, done);
- }
+ // optional .SourceStateProto state = 3;
+ /**
+ * optional .SourceStateProto state = 3;
+ */
+ boolean hasState();
+ /**
+ * optional .SourceStateProto state = 3;
+ */
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto getState();
+ }
+ /**
+ * Protobuf type {@code SourceStateUpdatedRequestProto}
+ */
+ public static final class SourceStateUpdatedRequestProto extends
+ com.google.protobuf.GeneratedMessage
+ implements SourceStateUpdatedRequestProtoOrBuilder {
+ // Use SourceStateUpdatedRequestProto.newBuilder() to construct.
+ private SourceStateUpdatedRequestProto(com.google.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private SourceStateUpdatedRequestProto(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
- };
+ private static final SourceStateUpdatedRequestProto defaultInstance;
+ public static SourceStateUpdatedRequestProto getDefaultInstance() {
+ return defaultInstance;
}
- public static com.google.protobuf.BlockingService
- newReflectiveBlockingService(final BlockingInterface impl) {
- return new com.google.protobuf.BlockingService() {
- public final com.google.protobuf.Descriptors.ServiceDescriptor
- getDescriptorForType() {
- return getDescriptor();
- }
+ public SourceStateUpdatedRequestProto getDefaultInstanceForType() {
+ return defaultInstance;
+ }
- public final com.google.protobuf.Message callBlockingMethod(
- com.google.protobuf.Descriptors.MethodDescriptor method,
- com.google.protobuf.RpcController controller,
- com.google.protobuf.Message request)
- throws com.google.protobuf.ServiceException {
- if (method.getService() != getDescriptor()) {
- throw new java.lang.IllegalArgumentException(
- "Service.callBlockingMethod() given method descriptor for " +
- "wrong service type.");
- }
- switch(method.getIndex()) {
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private SourceStateUpdatedRequestProto(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
case 0:
- return impl.submitWork(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto)request);
- default:
- throw new java.lang.AssertionError("Can't get here.");
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 10: {
+ bitField0_ |= 0x00000001;
+ dagName_ = input.readBytes();
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ srcName_ = input.readBytes();
+ break;
+ }
+ case 24: {
+ int rawValue = input.readEnum();
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto value = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto.valueOf(rawValue);
+ if (value == null) {
+ unknownFields.mergeVarintField(3, rawValue);
+ } else {
+ bitField0_ |= 0x00000004;
+ state_ = value;
+ }
+ break;
+ }
}
}
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedRequestProto_descriptor;
+ }
- public final com.google.protobuf.Message
- getRequestPrototype(
- com.google.protobuf.Descriptors.MethodDescriptor method) {
- if (method.getService() != getDescriptor()) {
- throw new java.lang.IllegalArgumentException(
- "Service.getRequestPrototype() given method " +
- "descriptor for wrong service type.");
- }
- switch(method.getIndex()) {
- case 0:
- return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto.getDefaultInstance();
- default:
- throw new java.lang.AssertionError("Can't get here.");
- }
- }
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.Builder.class);
+ }
- public final com.google.protobuf.Message
- getResponsePrototype(
- com.google.protobuf.Descriptors.MethodDescriptor method) {
- if (method.getService() != getDescriptor()) {
- throw new java.lang.IllegalArgumentException(
- "Service.getResponsePrototype() given method " +
- "descriptor for wrong service type.");
- }
- switch(method.getIndex()) {
- case 0:
- return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance();
- default:
- throw new java.lang.AssertionError("Can't get here.");
- }
- }
+ public static com.google.protobuf.Parser PARSER =
+ new com.google.protobuf.AbstractParser() {
+ public SourceStateUpdatedRequestProto parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new SourceStateUpdatedRequestProto(input, extensionRegistry);
+ }
+ };
- };
+ @java.lang.Override
+ public com.google.protobuf.Parser getParserForType() {
+ return PARSER;
}
+ private int bitField0_;
+ // optional string dag_name = 1;
+ public static final int DAG_NAME_FIELD_NUMBER = 1;
+ private java.lang.Object dagName_;
/**
- * rpc submitWork(.SubmitWorkRequestProto) returns (.SubmitWorkResponseProto);
+ * optional string dag_name = 1;
*/
- public abstract void submitWork(
- com.google.protobuf.RpcController controller,
- org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request,
- com.google.protobuf.RpcCallback done);
-
- public static final
- com.google.protobuf.Descriptors.ServiceDescriptor
- getDescriptor() {
- return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor().getServices().get(0);
- }
- public final com.google.protobuf.Descriptors.ServiceDescriptor
- getDescriptorForType() {
- return getDescriptor();
+ public boolean hasDagName() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
}
-
- public final void callMethod(
- com.google.protobuf.Descriptors.MethodDescriptor method,
- com.google.protobuf.RpcController controller,
- com.google.protobuf.Message request,
- com.google.protobuf.RpcCallback<
- com.google.protobuf.Message> done) {
- if (method.getService() != getDescriptor()) {
- throw new java.lang.IllegalArgumentException(
- "Service.callMethod() given method descriptor for wrong " +
- "service type.");
+ /**
+ * optional string dag_name = 1;
+ */
+ public java.lang.String getDagName() {
+ java.lang.Object ref = dagName_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ dagName_ = s;
+ }
+ return s;
}
- switch(method.getIndex()) {
- case 0:
+ }
+ /**
+ * optional string dag_name = 1;
+ */
+ public com.google.protobuf.ByteString
+ getDagNameBytes() {
+ java.lang.Object ref = dagName_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ dagName_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // optional string src_name = 2;
+ public static final int SRC_NAME_FIELD_NUMBER = 2;
+ private java.lang.Object srcName_;
+ /**
+ * optional string src_name = 2;
+ */
+ public boolean hasSrcName() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * optional string src_name = 2;
+ */
+ public java.lang.String getSrcName() {
+ java.lang.Object ref = srcName_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ srcName_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * optional string src_name = 2;
+ */
+ public com.google.protobuf.ByteString
+ getSrcNameBytes() {
+ java.lang.Object ref = srcName_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ srcName_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // optional .SourceStateProto state = 3;
+ public static final int STATE_FIELD_NUMBER = 3;
+ private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto state_;
+ /**
+ * optional .SourceStateProto state = 3;
+ */
+ public boolean hasState() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * optional .SourceStateProto state = 3;
+ */
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto getState() {
+ return state_;
+ }
+
+ private void initFields() {
+ dagName_ = "";
+ srcName_ = "";
+ state_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, getDagNameBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, getSrcNameBytes());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeEnum(3, state_.getNumber());
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(1, getDagNameBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, getSrcNameBytes());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeEnumSize(3, state_.getNumber());
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto) obj;
+
+ boolean result = true;
+ result = result && (hasDagName() == other.hasDagName());
+ if (hasDagName()) {
+ result = result && getDagName()
+ .equals(other.getDagName());
+ }
+ result = result && (hasSrcName() == other.hasSrcName());
+ if (hasSrcName()) {
+ result = result && getSrcName()
+ .equals(other.getSrcName());
+ }
+ result = result && (hasState() == other.hasState());
+ if (hasState()) {
+ result = result &&
+ (getState() == other.getState());
+ }
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ private int memoizedHashCode = 0;
+ @java.lang.Override
+ public int hashCode() {
+ if (memoizedHashCode != 0) {
+ return memoizedHashCode;
+ }
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (hasDagName()) {
+ hash = (37 * hash) + DAG_NAME_FIELD_NUMBER;
+ hash = (53 * hash) + getDagName().hashCode();
+ }
+ if (hasSrcName()) {
+ hash = (37 * hash) + SRC_NAME_FIELD_NUMBER;
+ hash = (53 * hash) + getSrcName().hashCode();
+ }
+ if (hasState()) {
+ hash = (37 * hash) + STATE_FIELD_NUMBER;
+ hash = (53 * hash) + hashEnum(getState());
+ }
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ memoizedHashCode = hash;
+ return hash;
+ }
+
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code SourceStateUpdatedRequestProto}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder
+ implements org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProtoOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedRequestProto_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.Builder.class);
+ }
+
+ // Construct using org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ dagName_ = "";
+ bitField0_ = (bitField0_ & ~0x00000001);
+ srcName_ = "";
+ bitField0_ = (bitField0_ & ~0x00000002);
+ state_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedRequestProto_descriptor;
+ }
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto getDefaultInstanceForType() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto build() {
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto buildPartial() {
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.dagName_ = dagName_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.srcName_ = srcName_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.state_ = state_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto) {
+ return mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto other) {
+ if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.getDefaultInstance()) return this;
+ if (other.hasDagName()) {
+ bitField0_ |= 0x00000001;
+ dagName_ = other.dagName_;
+ onChanged();
+ }
+ if (other.hasSrcName()) {
+ bitField0_ |= 0x00000002;
+ srcName_ = other.srcName_;
+ onChanged();
+ }
+ if (other.hasState()) {
+ setState(other.getState());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // optional string dag_name = 1;
+ private java.lang.Object dagName_ = "";
+ /**
+ * optional string dag_name = 1;
+ */
+ public boolean hasDagName() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * optional string dag_name = 1;
+ */
+ public java.lang.String getDagName() {
+ java.lang.Object ref = dagName_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ dagName_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * optional string dag_name = 1;
+ */
+ public com.google.protobuf.ByteString
+ getDagNameBytes() {
+ java.lang.Object ref = dagName_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ dagName_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * optional string dag_name = 1;
+ */
+ public Builder setDagName(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ dagName_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string dag_name = 1;
+ */
+ public Builder clearDagName() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ dagName_ = getDefaultInstance().getDagName();
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string dag_name = 1;
+ */
+ public Builder setDagNameBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ dagName_ = value;
+ onChanged();
+ return this;
+ }
+
+ // optional string src_name = 2;
+ private java.lang.Object srcName_ = "";
+ /**
+ * optional string src_name = 2;
+ */
+ public boolean hasSrcName() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * optional string src_name = 2;
+ */
+ public java.lang.String getSrcName() {
+ java.lang.Object ref = srcName_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ srcName_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * optional string src_name = 2;
+ */
+ public com.google.protobuf.ByteString
+ getSrcNameBytes() {
+ java.lang.Object ref = srcName_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ srcName_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * optional string src_name = 2;
+ */
+ public Builder setSrcName(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ srcName_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string src_name = 2;
+ */
+ public Builder clearSrcName() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ srcName_ = getDefaultInstance().getSrcName();
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string src_name = 2;
+ */
+ public Builder setSrcNameBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ srcName_ = value;
+ onChanged();
+ return this;
+ }
+
+ // optional .SourceStateProto state = 3;
+ private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto state_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED;
+ /**
+ * optional .SourceStateProto state = 3;
+ */
+ public boolean hasState() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * optional .SourceStateProto state = 3;
+ */
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto getState() {
+ return state_;
+ }
+ /**
+ * optional .SourceStateProto state = 3;
+ */
+ public Builder setState(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ state_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional .SourceStateProto state = 3;
+ */
+ public Builder clearState() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ state_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED;
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:SourceStateUpdatedRequestProto)
+ }
+
+ static {
+ defaultInstance = new SourceStateUpdatedRequestProto(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:SourceStateUpdatedRequestProto)
+ }
+
+ public interface SourceStateUpdatedResponseProtoOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+ }
+ /**
+ * Protobuf type {@code SourceStateUpdatedResponseProto}
+ */
+ public static final class SourceStateUpdatedResponseProto extends
+ com.google.protobuf.GeneratedMessage
+ implements SourceStateUpdatedResponseProtoOrBuilder {
+ // Use SourceStateUpdatedResponseProto.newBuilder() to construct.
+ private SourceStateUpdatedResponseProto(com.google.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private SourceStateUpdatedResponseProto(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final SourceStateUpdatedResponseProto defaultInstance;
+ public static SourceStateUpdatedResponseProto getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public SourceStateUpdatedResponseProto getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private SourceStateUpdatedResponseProto(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedResponseProto_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedResponseProto_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser PARSER =
+ new com.google.protobuf.AbstractParser() {
+ public SourceStateUpdatedResponseProto parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new SourceStateUpdatedResponseProto(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ private void initFields() {
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto) obj;
+
+ boolean result = true;
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ private int memoizedHashCode = 0;
+ @java.lang.Override
+ public int hashCode() {
+ if (memoizedHashCode != 0) {
+ return memoizedHashCode;
+ }
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ memoizedHashCode = hash;
+ return hash;
+ }
+
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code SourceStateUpdatedResponseProto}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder
+ implements org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProtoOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedResponseProto_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedResponseProto_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.Builder.class);
+ }
+
+ // Construct using org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedResponseProto_descriptor;
+ }
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto getDefaultInstanceForType() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto build() {
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto buildPartial() {
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto(this);
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto) {
+ return mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto other) {
+ if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance()) return this;
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:SourceStateUpdatedResponseProto)
+ }
+
+ static {
+ defaultInstance = new SourceStateUpdatedResponseProto(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:SourceStateUpdatedResponseProto)
+ }
+
+ /**
+ * Protobuf service {@code LlapDaemonProtocol}
+ */
+ public static abstract class LlapDaemonProtocol
+ implements com.google.protobuf.Service {
+ protected LlapDaemonProtocol() {}
+
+ public interface Interface {
+ /**
+ * rpc submitWork(.SubmitWorkRequestProto) returns (.SubmitWorkResponseProto);
+ */
+ public abstract void submitWork(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request,
+ com.google.protobuf.RpcCallback done);
+
+ /**
+ * rpc sourceStateUpdated(.SourceStateUpdatedRequestProto) returns (.SourceStateUpdatedResponseProto);
+ */
+ public abstract void sourceStateUpdated(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request,
+ com.google.protobuf.RpcCallback done);
+
+ }
+
+ public static com.google.protobuf.Service newReflectiveService(
+ final Interface impl) {
+ return new LlapDaemonProtocol() {
+ @java.lang.Override
+ public void submitWork(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request,
+ com.google.protobuf.RpcCallback done) {
+ impl.submitWork(controller, request, done);
+ }
+
+ @java.lang.Override
+ public void sourceStateUpdated(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request,
+ com.google.protobuf.RpcCallback done) {
+ impl.sourceStateUpdated(controller, request, done);
+ }
+
+ };
+ }
+
+ public static com.google.protobuf.BlockingService
+ newReflectiveBlockingService(final BlockingInterface impl) {
+ return new com.google.protobuf.BlockingService() {
+ public final com.google.protobuf.Descriptors.ServiceDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+
+ public final com.google.protobuf.Message callBlockingMethod(
+ com.google.protobuf.Descriptors.MethodDescriptor method,
+ com.google.protobuf.RpcController controller,
+ com.google.protobuf.Message request)
+ throws com.google.protobuf.ServiceException {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.callBlockingMethod() given method descriptor for " +
+ "wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return impl.submitWork(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto)request);
+ case 1:
+ return impl.sourceStateUpdated(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto)request);
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ public final com.google.protobuf.Message
+ getRequestPrototype(
+ com.google.protobuf.Descriptors.MethodDescriptor method) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.getRequestPrototype() given method " +
+ "descriptor for wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto.getDefaultInstance();
+ case 1:
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.getDefaultInstance();
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ public final com.google.protobuf.Message
+ getResponsePrototype(
+ com.google.protobuf.Descriptors.MethodDescriptor method) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.getResponsePrototype() given method " +
+ "descriptor for wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance();
+ case 1:
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance();
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ };
+ }
+
+ /**
+ * rpc submitWork(.SubmitWorkRequestProto) returns (.SubmitWorkResponseProto);
+ */
+ public abstract void submitWork(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request,
+ com.google.protobuf.RpcCallback done);
+
+ /**
+ * rpc sourceStateUpdated(.SourceStateUpdatedRequestProto) returns (.SourceStateUpdatedResponseProto);
+ */
+ public abstract void sourceStateUpdated(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request,
+ com.google.protobuf.RpcCallback done);
+
+ public static final
+ com.google.protobuf.Descriptors.ServiceDescriptor
+ getDescriptor() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor().getServices().get(0);
+ }
+ public final com.google.protobuf.Descriptors.ServiceDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+
+ public final void callMethod(
+ com.google.protobuf.Descriptors.MethodDescriptor method,
+ com.google.protobuf.RpcController controller,
+ com.google.protobuf.Message request,
+ com.google.protobuf.RpcCallback<
+ com.google.protobuf.Message> done) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.callMethod() given method descriptor for wrong " +
+ "service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
this.submitWork(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto)request,
com.google.protobuf.RpcUtil.specializeCallback(
done));
return;
+ case 1:
+ this.sourceStateUpdated(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto)request,
+ com.google.protobuf.RpcUtil.specializeCallback(
+ done));
+ return;
default:
throw new java.lang.AssertionError("Can't get here.");
}
@@ -7578,6 +8806,8 @@ public final void callMethod(
switch(method.getIndex()) {
case 0:
return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto.getDefaultInstance();
+ case 1:
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@@ -7594,6 +8824,8 @@ public final void callMethod(
switch(method.getIndex()) {
case 0:
return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance();
+ case 1:
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@@ -7629,6 +8861,21 @@ public void submitWork(
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.class,
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance()));
}
+
+ public void sourceStateUpdated(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request,
+ com.google.protobuf.RpcCallback done) {
+ channel.callMethod(
+ getDescriptor().getMethods().get(1),
+ controller,
+ request,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance(),
+ com.google.protobuf.RpcUtil.generalizeCallback(
+ done,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.class,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance()));
+ }
}
public static BlockingInterface newBlockingStub(
@@ -7641,6 +8888,11 @@ public static BlockingInterface newBlockingStub(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request)
throws com.google.protobuf.ServiceException;
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto sourceStateUpdated(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request)
+ throws com.google.protobuf.ServiceException;
}
private static final class BlockingStub implements BlockingInterface {
@@ -7661,6 +8913,18 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) {
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance());
}
+
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto sourceStateUpdated(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request)
+ throws com.google.protobuf.ServiceException {
+ return (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto) channel.callBlockingMethod(
+ getDescriptor().getMethods().get(1),
+ controller,
+ request,
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance());
+ }
+
}
// @@protoc_insertion_point(class_scope:LlapDaemonProtocol)
@@ -7701,6 +8965,16 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) {
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_SubmitWorkResponseProto_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_SourceStateUpdatedRequestProto_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_SourceStateUpdatedResponseProto_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_SourceStateUpdatedResponseProto_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@@ -7736,11 +9010,18 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) {
"ary\030\005 \001(\014\022\014\n\004user\030\006 \001(\t\022\035\n\025application_i" +
"d_string\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001" +
"(\005\022)\n\rfragment_spec\030\t \001(\0132\022.FragmentSpec" +
- "Proto\"\031\n\027SubmitWorkResponseProto2U\n\022Llap" +
- "DaemonProtocol\022?\n\nsubmitWork\022\027.SubmitWor" +
- "kRequestProto\032\030.SubmitWorkResponseProtoB" +
- "H\n&org.apache.hadoop.hive.llap.daemon.rp",
- "cB\030LlapDaemonProtocolProtos\210\001\001\240\001\001"
+ "Proto\"\031\n\027SubmitWorkResponseProto\"f\n\036Sour" +
+ "ceStateUpdatedRequestProto\022\020\n\010dag_name\030\001" +
+ " \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(\0162\021." +
+ "SourceStateProto\"!\n\037SourceStateUpdatedRe",
+ "sponseProto*2\n\020SourceStateProto\022\017\n\013S_SUC" +
+ "CEEDED\020\001\022\r\n\tS_RUNNING\020\0022\256\001\n\022LlapDaemonPr" +
+ "otocol\022?\n\nsubmitWork\022\027.SubmitWorkRequest" +
+ "Proto\032\030.SubmitWorkResponseProto\022W\n\022sourc" +
+ "eStateUpdated\022\037.SourceStateUpdatedReques" +
+ "tProto\032 .SourceStateUpdatedResponseProto" +
+ "BH\n&org.apache.hadoop.hive.llap.daemon.r" +
+ "pcB\030LlapDaemonProtocolProtos\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -7789,6 +9070,18 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) {
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SubmitWorkResponseProto_descriptor,
new java.lang.String[] { });
+ internal_static_SourceStateUpdatedRequestProto_descriptor =
+ getDescriptor().getMessageTypes().get(7);
+ internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_SourceStateUpdatedRequestProto_descriptor,
+ new java.lang.String[] { "DagName", "SrcName", "State", });
+ internal_static_SourceStateUpdatedResponseProto_descriptor =
+ getDescriptor().getMessageTypes().get(8);
+ internal_static_SourceStateUpdatedResponseProto_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_SourceStateUpdatedResponseProto_descriptor,
+ new java.lang.String[] { });
return null;
}
};
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
index 0b1303d..82f3b59 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
@@ -16,9 +16,12 @@
import java.io.IOException;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
public interface ContainerRunner {
void submitWork(SubmitWorkRequestProto request) throws IOException;
+
+ void sourceStateUpdated(SourceStateUpdatedRequestProto request);
}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 986ba24..c9baba1 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -179,6 +179,9 @@ protected Void callInternal() {
}
amNodeInfo.stopUmbilical();
} else {
+ // Add back to the queue for the next heartbeat, and schedule the actual heartbeat
+ amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval);
+ pendingHeartbeatQueeu.add(amNodeInfo);
executor.submit(new AMHeartbeatCallable(amNodeInfo));
}
} catch (InterruptedException e) {
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 989aad0..c142982 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -21,7 +21,10 @@
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,6 +42,8 @@
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
@@ -62,8 +67,11 @@
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult;
@@ -94,6 +102,9 @@
private final LlapDaemonExecutorMetrics metrics;
private final Configuration conf;
private final ConfParams confParams;
+
+ // Map of dagId to vertices and associated state.
+ private final ConcurrentMap> sourceCompletionMap = new ConcurrentHashMap<>();
// TODO Support for removing queued containers, interrupting / killing specific containers
public ContainerRunnerImpl(Configuration conf, int numExecutors, String[] localDirsBase, int localShufflePort,
@@ -141,7 +152,8 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, String[] localD
@Override
public void serviceStart() {
// The node id will only be available at this point, since the server has been started in LlapDaemon
- LlapNodeId llapNodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort());
+ LlapNodeId llapNodeId = LlapNodeId.getInstance(localAddress.get().getHostName(),
+ localAddress.get().getPort());
this.amReporter = new AMReporter(llapNodeId, conf);
amReporter.init(conf);
amReporter.start();
@@ -172,7 +184,7 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException {
localAddress.get().getHostName(), request.getFragmentSpec().getDagName(),
request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(),
request.getFragmentSpec().getAttemptNumber());
- LOG.info("Queueing container for execution: " + stringifyRequest(request));
+ LOG.info("Queueing container for execution: " + stringifySubmitRequest(request));
// This is the start of container-annotated logging.
// TODO Reduce the length of this string. Way too verbose at the moment.
String ndcContextString =
@@ -215,9 +227,10 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException {
LOG.info("DEBUG: Registering request with the ShuffleHandler");
ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser(), localDirs);
+ ConcurrentMap sourceCompletionMap = getSourceCompletionMap(request.getFragmentSpec().getDagName());
TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()),
new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
- credentials, memoryPerExecutor, amReporter, confParams);
+ credentials, memoryPerExecutor, amReporter, sourceCompletionMap, confParams);
ListenableFuture future = executorService.submit(callable);
Futures.addCallback(future, new TaskRunnerCallback(request, callable));
metrics.incrExecutorTotalRequestsHandled();
@@ -227,6 +240,13 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException {
}
}
+ @Override
+ public void sourceStateUpdated(SourceStateUpdatedRequestProto request) {
+ LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request));
+ ConcurrentMap dagMap = getSourceCompletionMap(request.getDagName());
+ dagMap.put(request.getSrcName(), request.getState());
+ }
+
static class TaskRunnerCallable extends CallableWithNdc {
private final SubmitWorkRequestProto request;
@@ -241,6 +261,8 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException {
private final ConfParams confParams;
private final Token jobToken;
private final AMReporter amReporter;
+ private final ConcurrentMap sourceCompletionMap;
+ private final TaskSpec taskSpec;
private volatile TezTaskRunner taskRunner;
private volatile TaskReporterInterface taskReporter;
private volatile ListeningExecutorService executor;
@@ -254,17 +276,20 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException {
TaskRunnerCallable(SubmitWorkRequestProto request, Configuration conf,
ExecutionContext executionContext, Map envMap,
String[] localDirs, Credentials credentials,
- long memoryAvailable, AMReporter amReporter, ConfParams confParams) {
+ long memoryAvailable, AMReporter amReporter,
+ ConcurrentMap sourceCompletionMap, ConfParams confParams) {
this.request = request;
this.conf = conf;
this.executionContext = executionContext;
this.envMap = envMap;
this.localDirs = localDirs;
this.objectRegistry = new ObjectRegistryImpl();
+ this.sourceCompletionMap = sourceCompletionMap;
this.credentials = credentials;
this.memoryAvailable = memoryAvailable;
this.confParams = confParams;
- jobToken = TokenCache.getSessionToken(credentials);
+ this.jobToken = TokenCache.getSessionToken(credentials);
+ this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec());
this.amReporter = amReporter;
// Register with the AMReporter when the callable is setup. Unregister once it starts running.
this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken);
@@ -274,6 +299,10 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException {
protected ContainerExecutionResult callInternal() throws Exception {
this.startTime = System.currentTimeMillis();
this.threadName = Thread.currentThread().getName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
+ }
+
// Unregister from the AMReporter, since the task is now running.
this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
@@ -320,7 +349,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception {
request.getContainerIdString());
taskRunner = new TezTaskRunner(conf, taskUgi, localDirs,
- Converters.getTaskSpecfromProto(request.getFragmentSpec()),
+ taskSpec,
request.getAppAttemptNumber(),
serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry,
pid,
@@ -346,10 +375,51 @@ public LlapTaskUmbilicalProtocol run() throws Exception {
}
LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
sw.stop().elapsedMillis());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
+ }
+
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
null);
}
+ /**
+ * Check whether a task can run to completion or may end up blocking on it's sources.
+ * This currently happens via looking up source state.
+ * TODO: Eventually, this should lookup the Hive Processor to figure out whether
+ * it's reached a state where it can finish - especially in cases of failures
+ * after data has been fetched.
+ * @return
+ */
+ public boolean canFinish() {
+ List inputSpecList = taskSpec.getInputs();
+ boolean canFinish = true;
+ if (inputSpecList != null && !inputSpecList.isEmpty()) {
+ for (InputSpec inputSpec : inputSpecList) {
+ if (isSourceOfInterest(inputSpec)) {
+ // Lookup the state in the map.
+ SourceStateProto state = sourceCompletionMap.get(inputSpec.getSourceVertexName());
+ if (state != null && state == SourceStateProto.S_SUCCEEDED) {
+ continue;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot finish due to source: " + inputSpec.getSourceVertexName());
+ }
+ canFinish = false;
+ break;
+ }
+ }
+ }
+ }
+ return canFinish;
+ }
+
+ private boolean isSourceOfInterest(InputSpec inputSpec) {
+ String inputClassName = inputSpec.getInputDescriptor().getClassName();
+ // MRInput is not of interest since it'll always be ready.
+ return !inputClassName.equals(MRInputLegacy.class.getName());
+ }
+
public void shutdown() {
executor.shutdownNow();
if (taskReporter != null) {
@@ -444,7 +514,15 @@ public ConfParams(int amHeartbeatIntervalMsMax, long amCounterHeartbeatInterval,
}
}
- private String stringifyRequest(SubmitWorkRequestProto request) {
+ private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("dagName=").append(request.getDagName())
+ .append(", ").append("sourceName=").append(request.getSrcName())
+ .append(", ").append("state=").append(request.getState());
+ return sb.toString();
+ }
+
+ private String stringifySubmitRequest(SubmitWorkRequestProto request) {
StringBuilder sb = new StringBuilder();
sb.append("am_details=").append(request.getAmHost()).append(":").append(request.getAmPort());
sb.append(", user=").append(request.getUser());
@@ -462,25 +540,40 @@ private String stringifyRequest(SubmitWorkRequestProto request) {
sb.append(", Inputs={");
if (fragmentSpec.getInputSpecsCount() > 0) {
for (IOSpecProto ioSpec : fragmentSpec.getInputSpecsList()) {
- sb.append("{").append(ioSpec.getConnectedVertexName()).append(",").append(ioSpec.getIoDescriptor().getClassName()).append(",").append(ioSpec.getPhysicalEdgeCount()).append("}");
+ sb.append("{").append(ioSpec.getConnectedVertexName()).append(",")
+ .append(ioSpec.getIoDescriptor().getClassName()).append(",")
+ .append(ioSpec.getPhysicalEdgeCount()).append("}");
}
}
sb.append("}");
sb.append(", Outputs={");
if (fragmentSpec.getOutputSpecsCount() > 0) {
for (IOSpecProto ioSpec : fragmentSpec.getOutputSpecsList()) {
- sb.append("{").append(ioSpec.getConnectedVertexName()).append(",").append(ioSpec.getIoDescriptor().getClassName()).append(",").append(ioSpec.getPhysicalEdgeCount()).append("}");
+ sb.append("{").append(ioSpec.getConnectedVertexName()).append(",")
+ .append(ioSpec.getIoDescriptor().getClassName()).append(",")
+ .append(ioSpec.getPhysicalEdgeCount()).append("}");
}
}
sb.append("}");
sb.append(", GroupedInputs={");
if (fragmentSpec.getGroupedInputSpecsCount() > 0) {
for (GroupInputSpecProto group : fragmentSpec.getGroupedInputSpecsList()) {
- sb.append("{").append("groupName=").append(group.getGroupName()).append(", elements=").append(group.getGroupVerticesList()).append("}");
+ sb.append("{").append("groupName=").append(group.getGroupName()).append(", elements=")
+ .append(group.getGroupVerticesList()).append("}");
sb.append(group.getGroupVerticesList());
}
}
sb.append("}");
return sb.toString();
}
+
+ private ConcurrentMap getSourceCompletionMap(String dagName) {
+ ConcurrentMap dagMap = sourceCompletionMap.get(dagName);
+ if (dagMap == null) {
+ dagMap = new ConcurrentHashMap<>();
+ ConcurrentMap old = sourceCompletionMap.putIfAbsent(dagName, dagMap);
+ dagMap = (old != null) ? old : dagMap;
+ }
+ return dagMap;
+ }
}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index cea7692..eb8d64b 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -242,6 +242,11 @@ public void submitWork(LlapDaemonProtocolProtos.SubmitWorkRequestProto request)
containerRunner.submitWork(request);
}
+ @Override
+ public void sourceStateUpdated(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request) {
+ containerRunner.sourceStateUpdated(request);
+ }
+
@VisibleForTesting
public long getNumSubmissions() {
return numSubmissions.get();
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
index ae74ae8..01b53c2 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
@@ -20,7 +20,10 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
@@ -42,8 +45,8 @@ public LlapDaemonProtocolClientImpl(Configuration conf, String hostname, int por
}
@Override
- public LlapDaemonProtocolProtos.SubmitWorkResponseProto submitWork(RpcController controller,
- LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws
+ public SubmitWorkResponseProto submitWork(RpcController controller,
+ SubmitWorkRequestProto request) throws
ServiceException {
try {
return getProxy().submitWork(null, request);
@@ -52,6 +55,17 @@ public LlapDaemonProtocolClientImpl(Configuration conf, String hostname, int por
}
}
+ @Override
+ public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
+ SourceStateUpdatedRequestProto request) throws
+ ServiceException {
+ try {
+ return getProxy().sourceStateUpdated(null, request);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
public LlapDaemonProtocolBlockingPB getProxy() throws IOException {
if (proxy == null) {
proxy = createProxy();
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
index 5ea11fd..0360a27 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
@@ -27,6 +27,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
@@ -62,7 +65,7 @@ public LlapDaemonProtocolServerImpl(int numHandlers,
@Override
public SubmitWorkResponseProto submitWork(RpcController controller,
- LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws
+ SubmitWorkRequestProto request) throws
ServiceException {
try {
containerRunner.submitWork(request);
@@ -73,6 +76,14 @@ public SubmitWorkResponseProto submitWork(RpcController controller,
}
@Override
+ public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
+ SourceStateUpdatedRequestProto request) throws
+ ServiceException {
+ containerRunner.sourceStateUpdated(request);
+ return SourceStateUpdatedResponseProto.getDefaultInstance();
+ }
+
+ @Override
public void serviceStart() {
Configuration conf = getConfig();
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
index 7e06f2b..5bd1fe9 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.EntityDescriptor;
@@ -31,6 +32,7 @@
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
@@ -250,4 +252,15 @@ private static UserPayload convertPayloadFromProto(
return userPayload;
}
+ public static SourceStateProto fromVertexState(VertexState state) {
+ switch (state) {
+ case SUCCEEDED:
+ return SourceStateProto.S_SUCCEEDED;
+ case RUNNING:
+ return SourceStateProto.S_RUNNING;
+ default:
+ throw new RuntimeException("Unexpected state: " + state);
+ }
+ }
+
}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 0677de1..e1610fe 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -31,9 +31,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -51,6 +54,7 @@
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -64,11 +68,16 @@
private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
private final ConcurrentMap credentialMap;
+ // Tracks containerIds and taskAttemptIds, so can be kept independent of the running DAG.
+ // When DAG specific cleanup happens, it'll be better to link this to a DAG though.
private final EntityTracker entityTracker = new EntityTracker();
+ private final SourceStateTracker sourceStateTracker;
private TaskCommunicator communicator;
private final LlapTaskUmbilicalProtocol umbilical;
+ private volatile String currentDagName;
+
public LlapTaskCommunicator(
TaskCommunicatorContext taskCommunicatorContext) {
super(taskCommunicatorContext);
@@ -87,6 +96,7 @@ public LlapTaskCommunicator(
BASE_SUBMIT_WORK_REQUEST = baseBuilder.build();
credentialMap = new ConcurrentHashMap<>();
+ sourceStateTracker = new SourceStateTracker(getTaskCommunicatorContext(), this);
}
@Override
@@ -153,6 +163,7 @@ public void registerContainerEnd(ContainerId containerId) {
entityTracker.unregisterContainer(containerId);
}
+
@Override
public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec,
Map additionalResources,
@@ -161,6 +172,9 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task
int priority) {
super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials,
credentialsChanged, priority);
+ if (taskSpec.getDAGName() != currentDagName) {
+ resetCurrentDag(taskSpec.getDAGName());
+ }
SubmitWorkRequestProto requestProto;
try {
@@ -184,6 +198,8 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task
entityTracker.registerTaskAttempt(containerId, taskSpec.getTaskAttemptID(), host, port);
+ sourceStateTracker.addTask(host, port, taskSpec.getInputs());
+
// Have to register this up front right now. Otherwise, it's possible for the task to start
// sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
getTaskCommunicatorContext()
@@ -240,6 +256,45 @@ public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
// be told that it needs to die since it isn't recognized.
}
+ @Override
+ public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
+ // Delegate updates over to the source state tracker.
+ sourceStateTracker
+ .sourceStateUpdated(vertexStateUpdate.getVertexName(), vertexStateUpdate.getVertexState());
+ }
+
+ public void sendStateUpdate(final String host, final int port,
+ final SourceStateUpdatedRequestProto request) {
+ communicator.sendSourceStateUpdate(request, host, port,
+ new TaskCommunicator.ExecuteRequestCallback() {
+ @Override
+ public void setResponse(SourceStateUpdatedResponseProto response) {
+ }
+
+ @Override
+ public void indicateError(Throwable t) {
+ // TODO HIVE-10280.
+ // Ideally, this should be retried for a while, after which the node should be marked as failed.
+ // Considering tasks are supposed to run fast. Failing the task immediately may be a good option.
+ LOG.error(
+ "Failed to send state update to node: " + host + ":" + port + ", StateUpdate=" +
+ request, t);
+ }
+ });
+ }
+
+
+
+ private void resetCurrentDag(String newDagName) {
+ // Working on the assumption that a single DAG runs at a time per AM.
+ currentDagName = newDagName;
+ sourceStateTracker.resetState(newDagName);
+ LOG.info("CurrentDag set to: " + newDagName);
+ // TODO Additional state reset. Potentially sending messages to node to reset.
+ // Is it possible for heartbeats to come in from lost tasks - those should be told to die, which
+ // is likely already happening.
+ }
+
private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId,
TaskSpec taskSpec) throws
IOException {
@@ -411,4 +466,4 @@ void nodePinged(String hostname, int port) {
}
}
}
-}
+}
\ No newline at end of file
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
index 1670a48..3b4612d 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
@@ -27,14 +27,20 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.service.AbstractService;
public class TaskCommunicator extends AbstractService {
+ private static final Log LOG = LogFactory.getLog(TaskCommunicator.class);
+
private final ConcurrentMap hostProxies;
private ListeningExecutorService executor;
@@ -53,7 +59,7 @@ public void serviceStop() {
public void submitWork(SubmitWorkRequestProto request, String host, int port,
final ExecuteRequestCallback callback) {
- ListenableFuture future = executor.submit(new SubmitWorkCallable(request, host, port));
+ ListenableFuture future = executor.submit(new SubmitWorkCallable(host, port, request));
Futures.addCallback(future, new FutureCallback() {
@Override
public void onSuccess(SubmitWorkResponseProto result) {
@@ -68,23 +74,66 @@ public void onFailure(Throwable t) {
}
- private class SubmitWorkCallable implements Callable {
+ public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String host, final int port,
+ final ExecuteRequestCallback callback) {
+ ListenableFuture future =
+ executor.submit(new SendSourceStateUpdateCallable(host, port, request));
+ Futures.addCallback(future, new FutureCallback() {
+ @Override
+ public void onSuccess(SourceStateUpdatedResponseProto result) {
+ callback.setResponse(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.indicateError(t);
+ }
+ });
+ }
+
+ private static abstract class CallableRequest implements Callable {
+
final String hostname;
final int port;
- final SubmitWorkRequestProto request;
+ final REQUEST request;
- private SubmitWorkCallable(SubmitWorkRequestProto request, String hostname, int port) {
+
+ protected CallableRequest(String hostname, int port, REQUEST request) {
this.hostname = hostname;
this.port = port;
this.request = request;
}
+ public abstract RESPONSE call() throws Exception;
+ }
+
+ private class SubmitWorkCallable extends CallableRequest {
+
+ protected SubmitWorkCallable(String hostname, int port,
+ SubmitWorkRequestProto submitWorkRequestProto) {
+ super(hostname, port, submitWorkRequestProto);
+ }
+
@Override
public SubmitWorkResponseProto call() throws Exception {
return getProxy(hostname, port).submitWork(null, request);
}
}
+ private class SendSourceStateUpdateCallable
+ extends CallableRequest {
+
+ public SendSourceStateUpdateCallable(String hostname, int port,
+ SourceStateUpdatedRequestProto request) {
+ super(hostname, port, request);
+ }
+
+ @Override
+ public SourceStateUpdatedResponseProto call() throws Exception {
+ return getProxy(hostname, port).sourceStateUpdated(null, request);
+ }
+ }
+
public interface ExecuteRequestCallback {
void setResponse(T response);
void indicateError(Throwable t);
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
new file mode 100644
index 0000000..698de76
--- /dev/null
+++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.helpers;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.tezplugins.Converters;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.runtime.api.impl.InputSpec;
+
+public class SourceStateTracker {
+
+ private static final Log LOG = LogFactory.getLog(SourceStateTracker.class);
+
+ private final TaskCommunicatorContext taskCommunicatorContext;
+ private final LlapTaskCommunicator taskCommunicator;
+
+ // Tracks vertices for which notifications have been registered
+ private final Set notificationRegisteredVertices = new HashSet<>();
+
+ private final Map sourceInfoMap = new HashMap<>();
+ private final Map nodeInfoMap = new HashMap<>();
+
+ private volatile String currentDagName;
+
+
+ public SourceStateTracker(TaskCommunicatorContext taskCommunicatorContext,
+ LlapTaskCommunicator taskCommunicator) {
+ this.taskCommunicatorContext = taskCommunicatorContext;
+ this.taskCommunicator = taskCommunicator;
+ }
+
+ /**
+ * To be invoked after each DAG completes.
+ */
+ public synchronized void resetState(String newDagName) {
+ sourceInfoMap.clear();
+ nodeInfoMap.clear();
+ notificationRegisteredVertices.clear();
+ this.currentDagName = newDagName;
+ }
+
+ public synchronized void addTask(String host, int port, List inputSpecList) {
+
+ // Add tracking information. Check if source state already known and send out an update if it is.
+
+ List sourcesOfInterest = getSourceInterestList(inputSpecList);
+ if (sourcesOfInterest != null && !sourcesOfInterest.isEmpty()) {
+ LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+ NodeInfo nodeInfo = getNodeInfo(nodeId);
+
+ // Set up the data structures, before any notifications come in.
+ for (String src : sourcesOfInterest) {
+ VertexState oldStateForNode = nodeInfo.getLastKnownStateForSource(src);
+ if (oldStateForNode == null) {
+ // Not registered for this node.
+ // Register and send state if it is successful.
+ SourceInfo srcInfo = getSourceInfo(src);
+ srcInfo.addNode(nodeId);
+
+ nodeInfo.addSource(src, srcInfo.lastKnownState);
+ if (srcInfo.lastKnownState == VertexState.SUCCEEDED) {
+ sendStateUpdateToNode(nodeId, src, srcInfo.lastKnownState);
+ }
+
+ } else {
+ // Already registered to send updates to this node for the specific source.
+ // Nothing to do for now, unless tracking tasks at a later point.
+ }
+
+ // Setup for actual notifications, if not already done for a previous task.
+ maybeRegisterForVertexUpdates(src);
+ }
+ } else {
+ // Don't need to track anything for this task. No new notifications, etc.
+ }
+ }
+
+ public synchronized void sourceStateUpdated(String sourceName, VertexState sourceState) {
+ SourceInfo sourceInfo = getSourceInfo(sourceName);
+ sourceInfo.lastKnownState = sourceState;
+ // Checking state per node for future failure handling scenarios, where an update
+ // to a single node may fail.
+ for (LlapNodeId nodeId : sourceInfo.getInterestedNodes()) {
+ NodeInfo nodeInfo = nodeInfoMap.get(nodeId);
+ VertexState lastStateForNode = nodeInfo.getLastKnownStateForSource(sourceName);
+ // Send only if the state has changed.
+ if (lastStateForNode != sourceState) {
+ nodeInfo.setLastKnownStateForSource(sourceName, sourceState);
+ sendStateUpdateToNode(nodeId, sourceName, sourceState);
+ }
+ }
+ }
+
+
+ private static class SourceInfo {
+ private final List interestedNodes = new LinkedList<>();
+ // Always start in the running state. Requests for state updates will be sent out after registration.
+ private VertexState lastKnownState = VertexState.RUNNING;
+
+ void addNode(LlapNodeId nodeId) {
+ interestedNodes.add(nodeId);
+ }
+
+ List getInterestedNodes() {
+ return this.interestedNodes;
+ }
+ }
+
+ private synchronized SourceInfo getSourceInfo(String srcName) {
+ SourceInfo sourceInfo = sourceInfoMap.get(srcName);
+ if (sourceInfo == null) {
+ sourceInfo = new SourceInfo();
+ sourceInfoMap.put(srcName, sourceInfo);
+ }
+ return sourceInfo;
+ }
+
+
+ private static class NodeInfo {
+ private final Map sourcesOfInterest = new HashMap<>();
+
+ void addSource(String srcName, VertexState sourceState) {
+ sourcesOfInterest.put(srcName, sourceState);
+ }
+
+ VertexState getLastKnownStateForSource(String src) {
+ return sourcesOfInterest.get(src);
+ }
+
+ void setLastKnownStateForSource(String src, VertexState state) {
+ sourcesOfInterest.put(src, state);
+ }
+ }
+
+ private synchronized NodeInfo getNodeInfo(LlapNodeId llapNodeId) {
+ NodeInfo nodeInfo = nodeInfoMap.get(llapNodeId);
+ if (nodeInfo == null) {
+ nodeInfo = new NodeInfo();
+ nodeInfoMap.put(llapNodeId, nodeInfo);
+ }
+ return nodeInfo;
+ }
+
+
+ private List getSourceInterestList(List inputSpecList) {
+ List sourcesOfInterest = Collections.emptyList();
+ if (inputSpecList != null) {
+ boolean alreadyFound = false;
+ for (InputSpec inputSpec : inputSpecList) {
+ if (isSourceOfInterest(inputSpec)) {
+ if (!alreadyFound) {
+ alreadyFound = true;
+ sourcesOfInterest = new LinkedList<>();
+ }
+ sourcesOfInterest.add(inputSpec.getSourceVertexName());
+ }
+ }
+ }
+ return sourcesOfInterest;
+ }
+
+
+ private void maybeRegisterForVertexUpdates(String sourceName) {
+ if (!notificationRegisteredVertices.contains(sourceName)) {
+ notificationRegisteredVertices.add(sourceName);
+ taskCommunicatorContext.registerForVertexStateUpdates(sourceName, EnumSet.of(
+ VertexState.RUNNING, VertexState.SUCCEEDED));
+ }
+ }
+
+ private boolean isSourceOfInterest(InputSpec inputSpec) {
+ String inputClassName = inputSpec.getInputDescriptor().getClassName();
+ // MRInput is not of interest since it'll always be ready.
+ return !inputClassName.equals(MRInputLegacy.class.getName());
+ }
+
+ void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) {
+ taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(),
+ SourceStateUpdatedRequestProto.newBuilder().setDagName(currentDagName).setSrcName(
+ sourceName)
+ .setState(Converters.fromVertexState(state)).build());
+ }
+
+
+}
diff --git llap-server/src/protobuf/LlapDaemonProtocol.proto llap-server/src/protobuf/LlapDaemonProtocol.proto
index 4490828..654a155 100644
--- llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ llap-server/src/protobuf/LlapDaemonProtocol.proto
@@ -60,6 +60,11 @@ message FragmentSpecProto {
optional int32 attempt_number = 10;
}
+enum SourceStateProto {
+ S_SUCCEEDED = 1;
+ S_RUNNING = 2;
+}
+
message SubmitWorkRequestProto {
optional string container_id_string = 1;
optional string am_host = 2;
@@ -75,6 +80,16 @@ message SubmitWorkRequestProto {
message SubmitWorkResponseProto {
}
+message SourceStateUpdatedRequestProto {
+ optional string dag_name = 1;
+ optional string src_name = 2;
+ optional SourceStateProto state = 3;
+}
+
+message SourceStateUpdatedResponseProto {
+}
+
service LlapDaemonProtocol {
rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
+ rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto);
}