diff --git llap-server/pom.xml llap-server/pom.xml index 22ed693..9325bd9 100644 --- llap-server/pom.xml +++ llap-server/pom.xml @@ -184,6 +184,12 @@ org.apache.tez + tez-mapreduce + ${tez.version} + true + + + org.apache.tez tez-dag ${tez.version} true diff --git llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index 2caca26..3ca2640 100644 --- llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -8,6 +8,88 @@ private LlapDaemonProtocolProtos() {} public static void registerAllExtensions( com.google.protobuf.ExtensionRegistry registry) { } + /** + * Protobuf enum {@code SourceStateProto} + */ + public enum SourceStateProto + implements com.google.protobuf.ProtocolMessageEnum { + /** + * S_SUCCEEDED = 1; + */ + S_SUCCEEDED(0, 1), + /** + * S_RUNNING = 2; + */ + S_RUNNING(1, 2), + ; + + /** + * S_SUCCEEDED = 1; + */ + public static final int S_SUCCEEDED_VALUE = 1; + /** + * S_RUNNING = 2; + */ + public static final int S_RUNNING_VALUE = 2; + + + public final int getNumber() { return value; } + + public static SourceStateProto valueOf(int value) { + switch (value) { + case 1: return S_SUCCEEDED; + case 2: return S_RUNNING; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public SourceStateProto findValueByNumber(int number) { + return SourceStateProto.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor().getEnumTypes().get(0); + } + + private static final SourceStateProto[] VALUES = values(); + + public static SourceStateProto valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private SourceStateProto(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:SourceStateProto) + } + public interface UserPayloadProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -7434,134 +7516,1280 @@ public Builder mergeFrom( // @@protoc_insertion_point(class_scope:SubmitWorkResponseProto) } - /** - * Protobuf service {@code LlapDaemonProtocol} - */ - public static abstract class LlapDaemonProtocol - implements com.google.protobuf.Service { - protected LlapDaemonProtocol() {} + public interface SourceStateUpdatedRequestProtoOrBuilder + extends com.google.protobuf.MessageOrBuilder { - public interface Interface { - /** - * rpc submitWork(.SubmitWorkRequestProto) returns (.SubmitWorkResponseProto); - */ - public abstract void submitWork( - com.google.protobuf.RpcController controller, - org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request, - com.google.protobuf.RpcCallback done); + // optional string dag_name = 1; + /** + * optional string dag_name = 1; + */ + boolean hasDagName(); + /** + * optional string dag_name = 1; + */ + java.lang.String getDagName(); + /** + * optional string dag_name = 1; + */ + com.google.protobuf.ByteString + getDagNameBytes(); - } + // optional string src_name = 2; + /** + * optional string src_name = 2; + */ + boolean hasSrcName(); + /** + * optional string src_name = 2; + */ + java.lang.String getSrcName(); + /** + * optional string src_name = 2; + */ + com.google.protobuf.ByteString + getSrcNameBytes(); - public static com.google.protobuf.Service newReflectiveService( - final Interface impl) { - return new LlapDaemonProtocol() { - @java.lang.Override - public void submitWork( - com.google.protobuf.RpcController controller, - org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request, - com.google.protobuf.RpcCallback done) { - impl.submitWork(controller, request, done); - } + // optional .SourceStateProto state = 3; + /** + * optional .SourceStateProto state = 3; + */ + boolean hasState(); + /** + * optional .SourceStateProto state = 3; + */ + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto getState(); + } + /** + * Protobuf type {@code SourceStateUpdatedRequestProto} + */ + public static final class SourceStateUpdatedRequestProto extends + com.google.protobuf.GeneratedMessage + implements SourceStateUpdatedRequestProtoOrBuilder { + // Use SourceStateUpdatedRequestProto.newBuilder() to construct. + private SourceStateUpdatedRequestProto(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private SourceStateUpdatedRequestProto(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } - }; + private static final SourceStateUpdatedRequestProto defaultInstance; + public static SourceStateUpdatedRequestProto getDefaultInstance() { + return defaultInstance; } - public static com.google.protobuf.BlockingService - newReflectiveBlockingService(final BlockingInterface impl) { - return new com.google.protobuf.BlockingService() { - public final com.google.protobuf.Descriptors.ServiceDescriptor - getDescriptorForType() { - return getDescriptor(); - } + public SourceStateUpdatedRequestProto getDefaultInstanceForType() { + return defaultInstance; + } - public final com.google.protobuf.Message callBlockingMethod( - com.google.protobuf.Descriptors.MethodDescriptor method, - com.google.protobuf.RpcController controller, - com.google.protobuf.Message request) - throws com.google.protobuf.ServiceException { - if (method.getService() != getDescriptor()) { - throw new java.lang.IllegalArgumentException( - "Service.callBlockingMethod() given method descriptor for " + - "wrong service type."); - } - switch(method.getIndex()) { + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private SourceStateUpdatedRequestProto( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { case 0: - return impl.submitWork(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto)request); - default: - throw new java.lang.AssertionError("Can't get here."); + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + dagName_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + srcName_ = input.readBytes(); + break; + } + case 24: { + int rawValue = input.readEnum(); + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto value = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(3, rawValue); + } else { + bitField0_ |= 0x00000004; + state_ = value; + } + break; + } } } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedRequestProto_descriptor; + } - public final com.google.protobuf.Message - getRequestPrototype( - com.google.protobuf.Descriptors.MethodDescriptor method) { - if (method.getService() != getDescriptor()) { - throw new java.lang.IllegalArgumentException( - "Service.getRequestPrototype() given method " + - "descriptor for wrong service type."); - } - switch(method.getIndex()) { - case 0: - return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto.getDefaultInstance(); - default: - throw new java.lang.AssertionError("Can't get here."); - } - } + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.Builder.class); + } - public final com.google.protobuf.Message - getResponsePrototype( - com.google.protobuf.Descriptors.MethodDescriptor method) { - if (method.getService() != getDescriptor()) { - throw new java.lang.IllegalArgumentException( - "Service.getResponsePrototype() given method " + - "descriptor for wrong service type."); - } - switch(method.getIndex()) { - case 0: - return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance(); - default: - throw new java.lang.AssertionError("Can't get here."); - } - } + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public SourceStateUpdatedRequestProto parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new SourceStateUpdatedRequestProto(input, extensionRegistry); + } + }; - }; + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; } + private int bitField0_; + // optional string dag_name = 1; + public static final int DAG_NAME_FIELD_NUMBER = 1; + private java.lang.Object dagName_; /** - * rpc submitWork(.SubmitWorkRequestProto) returns (.SubmitWorkResponseProto); + * optional string dag_name = 1; */ - public abstract void submitWork( - com.google.protobuf.RpcController controller, - org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request, - com.google.protobuf.RpcCallback done); - - public static final - com.google.protobuf.Descriptors.ServiceDescriptor - getDescriptor() { - return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor().getServices().get(0); - } - public final com.google.protobuf.Descriptors.ServiceDescriptor - getDescriptorForType() { - return getDescriptor(); + public boolean hasDagName() { + return ((bitField0_ & 0x00000001) == 0x00000001); } - - public final void callMethod( - com.google.protobuf.Descriptors.MethodDescriptor method, - com.google.protobuf.RpcController controller, - com.google.protobuf.Message request, - com.google.protobuf.RpcCallback< - com.google.protobuf.Message> done) { - if (method.getService() != getDescriptor()) { - throw new java.lang.IllegalArgumentException( - "Service.callMethod() given method descriptor for wrong " + - "service type."); + /** + * optional string dag_name = 1; + */ + public java.lang.String getDagName() { + java.lang.Object ref = dagName_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + dagName_ = s; + } + return s; } - switch(method.getIndex()) { - case 0: + } + /** + * optional string dag_name = 1; + */ + public com.google.protobuf.ByteString + getDagNameBytes() { + java.lang.Object ref = dagName_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + dagName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional string src_name = 2; + public static final int SRC_NAME_FIELD_NUMBER = 2; + private java.lang.Object srcName_; + /** + * optional string src_name = 2; + */ + public boolean hasSrcName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional string src_name = 2; + */ + public java.lang.String getSrcName() { + java.lang.Object ref = srcName_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + srcName_ = s; + } + return s; + } + } + /** + * optional string src_name = 2; + */ + public com.google.protobuf.ByteString + getSrcNameBytes() { + java.lang.Object ref = srcName_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + srcName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional .SourceStateProto state = 3; + public static final int STATE_FIELD_NUMBER = 3; + private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto state_; + /** + * optional .SourceStateProto state = 3; + */ + public boolean hasState() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional .SourceStateProto state = 3; + */ + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto getState() { + return state_; + } + + private void initFields() { + dagName_ = ""; + srcName_ = ""; + state_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getDagNameBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, getSrcNameBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeEnum(3, state_.getNumber()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getDagNameBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, getSrcNameBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(3, state_.getNumber()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto)) { + return super.equals(obj); + } + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto) obj; + + boolean result = true; + result = result && (hasDagName() == other.hasDagName()); + if (hasDagName()) { + result = result && getDagName() + .equals(other.getDagName()); + } + result = result && (hasSrcName() == other.hasSrcName()); + if (hasSrcName()) { + result = result && getSrcName() + .equals(other.getSrcName()); + } + result = result && (hasState() == other.hasState()); + if (hasState()) { + result = result && + (getState() == other.getState()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasDagName()) { + hash = (37 * hash) + DAG_NAME_FIELD_NUMBER; + hash = (53 * hash) + getDagName().hashCode(); + } + if (hasSrcName()) { + hash = (37 * hash) + SRC_NAME_FIELD_NUMBER; + hash = (53 * hash) + getSrcName().hashCode(); + } + if (hasState()) { + hash = (37 * hash) + STATE_FIELD_NUMBER; + hash = (53 * hash) + hashEnum(getState()); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code SourceStateUpdatedRequestProto} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProtoOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedRequestProto_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.Builder.class); + } + + // Construct using org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + dagName_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + srcName_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + state_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED; + bitField0_ = (bitField0_ & ~0x00000004); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedRequestProto_descriptor; + } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto getDefaultInstanceForType() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.getDefaultInstance(); + } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto build() { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto buildPartial() { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.dagName_ = dagName_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.srcName_ = srcName_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.state_ = state_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto) { + return mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto other) { + if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.getDefaultInstance()) return this; + if (other.hasDagName()) { + bitField0_ |= 0x00000001; + dagName_ = other.dagName_; + onChanged(); + } + if (other.hasSrcName()) { + bitField0_ |= 0x00000002; + srcName_ = other.srcName_; + onChanged(); + } + if (other.hasState()) { + setState(other.getState()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional string dag_name = 1; + private java.lang.Object dagName_ = ""; + /** + * optional string dag_name = 1; + */ + public boolean hasDagName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional string dag_name = 1; + */ + public java.lang.String getDagName() { + java.lang.Object ref = dagName_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + dagName_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string dag_name = 1; + */ + public com.google.protobuf.ByteString + getDagNameBytes() { + java.lang.Object ref = dagName_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + dagName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string dag_name = 1; + */ + public Builder setDagName( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + dagName_ = value; + onChanged(); + return this; + } + /** + * optional string dag_name = 1; + */ + public Builder clearDagName() { + bitField0_ = (bitField0_ & ~0x00000001); + dagName_ = getDefaultInstance().getDagName(); + onChanged(); + return this; + } + /** + * optional string dag_name = 1; + */ + public Builder setDagNameBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + dagName_ = value; + onChanged(); + return this; + } + + // optional string src_name = 2; + private java.lang.Object srcName_ = ""; + /** + * optional string src_name = 2; + */ + public boolean hasSrcName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional string src_name = 2; + */ + public java.lang.String getSrcName() { + java.lang.Object ref = srcName_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + srcName_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string src_name = 2; + */ + public com.google.protobuf.ByteString + getSrcNameBytes() { + java.lang.Object ref = srcName_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + srcName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string src_name = 2; + */ + public Builder setSrcName( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + srcName_ = value; + onChanged(); + return this; + } + /** + * optional string src_name = 2; + */ + public Builder clearSrcName() { + bitField0_ = (bitField0_ & ~0x00000002); + srcName_ = getDefaultInstance().getSrcName(); + onChanged(); + return this; + } + /** + * optional string src_name = 2; + */ + public Builder setSrcNameBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + srcName_ = value; + onChanged(); + return this; + } + + // optional .SourceStateProto state = 3; + private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto state_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED; + /** + * optional .SourceStateProto state = 3; + */ + public boolean hasState() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional .SourceStateProto state = 3; + */ + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto getState() { + return state_; + } + /** + * optional .SourceStateProto state = 3; + */ + public Builder setState(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + state_ = value; + onChanged(); + return this; + } + /** + * optional .SourceStateProto state = 3; + */ + public Builder clearState() { + bitField0_ = (bitField0_ & ~0x00000004); + state_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:SourceStateUpdatedRequestProto) + } + + static { + defaultInstance = new SourceStateUpdatedRequestProto(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:SourceStateUpdatedRequestProto) + } + + public interface SourceStateUpdatedResponseProtoOrBuilder + extends com.google.protobuf.MessageOrBuilder { + } + /** + * Protobuf type {@code SourceStateUpdatedResponseProto} + */ + public static final class SourceStateUpdatedResponseProto extends + com.google.protobuf.GeneratedMessage + implements SourceStateUpdatedResponseProtoOrBuilder { + // Use SourceStateUpdatedResponseProto.newBuilder() to construct. + private SourceStateUpdatedResponseProto(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private SourceStateUpdatedResponseProto(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final SourceStateUpdatedResponseProto defaultInstance; + public static SourceStateUpdatedResponseProto getDefaultInstance() { + return defaultInstance; + } + + public SourceStateUpdatedResponseProto getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private SourceStateUpdatedResponseProto( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedResponseProto_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedResponseProto_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public SourceStateUpdatedResponseProto parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new SourceStateUpdatedResponseProto(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private void initFields() { + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto)) { + return super.equals(obj); + } + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto) obj; + + boolean result = true; + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code SourceStateUpdatedResponseProto} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProtoOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedResponseProto_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedResponseProto_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.Builder.class); + } + + // Construct using org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_SourceStateUpdatedResponseProto_descriptor; + } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto getDefaultInstanceForType() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance(); + } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto build() { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto buildPartial() { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto(this); + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto) { + return mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto other) { + if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance()) return this; + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + + // @@protoc_insertion_point(builder_scope:SourceStateUpdatedResponseProto) + } + + static { + defaultInstance = new SourceStateUpdatedResponseProto(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:SourceStateUpdatedResponseProto) + } + + /** + * Protobuf service {@code LlapDaemonProtocol} + */ + public static abstract class LlapDaemonProtocol + implements com.google.protobuf.Service { + protected LlapDaemonProtocol() {} + + public interface Interface { + /** + * rpc submitWork(.SubmitWorkRequestProto) returns (.SubmitWorkResponseProto); + */ + public abstract void submitWork( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request, + com.google.protobuf.RpcCallback done); + + /** + * rpc sourceStateUpdated(.SourceStateUpdatedRequestProto) returns (.SourceStateUpdatedResponseProto); + */ + public abstract void sourceStateUpdated( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request, + com.google.protobuf.RpcCallback done); + + } + + public static com.google.protobuf.Service newReflectiveService( + final Interface impl) { + return new LlapDaemonProtocol() { + @java.lang.Override + public void submitWork( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request, + com.google.protobuf.RpcCallback done) { + impl.submitWork(controller, request, done); + } + + @java.lang.Override + public void sourceStateUpdated( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request, + com.google.protobuf.RpcCallback done) { + impl.sourceStateUpdated(controller, request, done); + } + + }; + } + + public static com.google.protobuf.BlockingService + newReflectiveBlockingService(final BlockingInterface impl) { + return new com.google.protobuf.BlockingService() { + public final com.google.protobuf.Descriptors.ServiceDescriptor + getDescriptorForType() { + return getDescriptor(); + } + + public final com.google.protobuf.Message callBlockingMethod( + com.google.protobuf.Descriptors.MethodDescriptor method, + com.google.protobuf.RpcController controller, + com.google.protobuf.Message request) + throws com.google.protobuf.ServiceException { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.callBlockingMethod() given method descriptor for " + + "wrong service type."); + } + switch(method.getIndex()) { + case 0: + return impl.submitWork(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto)request); + case 1: + return impl.sourceStateUpdated(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto)request); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + public final com.google.protobuf.Message + getRequestPrototype( + com.google.protobuf.Descriptors.MethodDescriptor method) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.getRequestPrototype() given method " + + "descriptor for wrong service type."); + } + switch(method.getIndex()) { + case 0: + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto.getDefaultInstance(); + case 1: + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.getDefaultInstance(); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + public final com.google.protobuf.Message + getResponsePrototype( + com.google.protobuf.Descriptors.MethodDescriptor method) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.getResponsePrototype() given method " + + "descriptor for wrong service type."); + } + switch(method.getIndex()) { + case 0: + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance(); + case 1: + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance(); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + }; + } + + /** + * rpc submitWork(.SubmitWorkRequestProto) returns (.SubmitWorkResponseProto); + */ + public abstract void submitWork( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request, + com.google.protobuf.RpcCallback done); + + /** + * rpc sourceStateUpdated(.SourceStateUpdatedRequestProto) returns (.SourceStateUpdatedResponseProto); + */ + public abstract void sourceStateUpdated( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request, + com.google.protobuf.RpcCallback done); + + public static final + com.google.protobuf.Descriptors.ServiceDescriptor + getDescriptor() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor().getServices().get(0); + } + public final com.google.protobuf.Descriptors.ServiceDescriptor + getDescriptorForType() { + return getDescriptor(); + } + + public final void callMethod( + com.google.protobuf.Descriptors.MethodDescriptor method, + com.google.protobuf.RpcController controller, + com.google.protobuf.Message request, + com.google.protobuf.RpcCallback< + com.google.protobuf.Message> done) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.callMethod() given method descriptor for wrong " + + "service type."); + } + switch(method.getIndex()) { + case 0: this.submitWork(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto)request, com.google.protobuf.RpcUtil.specializeCallback( done)); return; + case 1: + this.sourceStateUpdated(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto)request, + com.google.protobuf.RpcUtil.specializeCallback( + done)); + return; default: throw new java.lang.AssertionError("Can't get here."); } @@ -7578,6 +8806,8 @@ public final void callMethod( switch(method.getIndex()) { case 0: return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto.getDefaultInstance(); + case 1: + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); } @@ -7594,6 +8824,8 @@ public final void callMethod( switch(method.getIndex()) { case 0: return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance(); + case 1: + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); } @@ -7629,6 +8861,21 @@ public void submitWork( org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance())); } + + public void sourceStateUpdated( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request, + com.google.protobuf.RpcCallback done) { + channel.callMethod( + getDescriptor().getMethods().get(1), + controller, + request, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance(), + com.google.protobuf.RpcUtil.generalizeCallback( + done, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.class, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance())); + } } public static BlockingInterface newBlockingStub( @@ -7641,6 +8888,11 @@ public static BlockingInterface newBlockingStub( com.google.protobuf.RpcController controller, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws com.google.protobuf.ServiceException; + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto sourceStateUpdated( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request) + throws com.google.protobuf.ServiceException; } private static final class BlockingStub implements BlockingInterface { @@ -7661,6 +8913,18 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance()); } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto sourceStateUpdated( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request) + throws com.google.protobuf.ServiceException { + return (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto) channel.callBlockingMethod( + getDescriptor().getMethods().get(1), + controller, + request, + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto.getDefaultInstance()); + } + } // @@protoc_insertion_point(class_scope:LlapDaemonProtocol) @@ -7701,6 +8965,16 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_SubmitWorkResponseProto_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_SourceStateUpdatedRequestProto_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_SourceStateUpdatedResponseProto_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_SourceStateUpdatedResponseProto_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -7736,11 +9010,18 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { "ary\030\005 \001(\014\022\014\n\004user\030\006 \001(\t\022\035\n\025application_i" + "d_string\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001" + "(\005\022)\n\rfragment_spec\030\t \001(\0132\022.FragmentSpec" + - "Proto\"\031\n\027SubmitWorkResponseProto2U\n\022Llap" + - "DaemonProtocol\022?\n\nsubmitWork\022\027.SubmitWor" + - "kRequestProto\032\030.SubmitWorkResponseProtoB" + - "H\n&org.apache.hadoop.hive.llap.daemon.rp", - "cB\030LlapDaemonProtocolProtos\210\001\001\240\001\001" + "Proto\"\031\n\027SubmitWorkResponseProto\"f\n\036Sour" + + "ceStateUpdatedRequestProto\022\020\n\010dag_name\030\001" + + " \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(\0162\021." + + "SourceStateProto\"!\n\037SourceStateUpdatedRe", + "sponseProto*2\n\020SourceStateProto\022\017\n\013S_SUC" + + "CEEDED\020\001\022\r\n\tS_RUNNING\020\0022\256\001\n\022LlapDaemonPr" + + "otocol\022?\n\nsubmitWork\022\027.SubmitWorkRequest" + + "Proto\032\030.SubmitWorkResponseProto\022W\n\022sourc" + + "eStateUpdated\022\037.SourceStateUpdatedReques" + + "tProto\032 .SourceStateUpdatedResponseProto" + + "BH\n&org.apache.hadoop.hive.llap.daemon.r" + + "pcB\030LlapDaemonProtocolProtos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -7789,6 +9070,18 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SubmitWorkResponseProto_descriptor, new java.lang.String[] { }); + internal_static_SourceStateUpdatedRequestProto_descriptor = + getDescriptor().getMessageTypes().get(7); + internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_SourceStateUpdatedRequestProto_descriptor, + new java.lang.String[] { "DagName", "SrcName", "State", }); + internal_static_SourceStateUpdatedResponseProto_descriptor = + getDescriptor().getMessageTypes().get(8); + internal_static_SourceStateUpdatedResponseProto_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_SourceStateUpdatedResponseProto_descriptor, + new java.lang.String[] { }); return null; } }; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java index 0b1303d..82f3b59 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java @@ -16,9 +16,12 @@ import java.io.IOException; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; public interface ContainerRunner { void submitWork(SubmitWorkRequestProto request) throws IOException; + + void sourceStateUpdated(SourceStateUpdatedRequestProto request); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 986ba24..c9baba1 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -179,6 +179,9 @@ protected Void callInternal() { } amNodeInfo.stopUmbilical(); } else { + // Add back to the queue for the next heartbeat, and schedule the actual heartbeat + amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval); + pendingHeartbeatQueeu.add(amNodeInfo); executor.submit(new AMHeartbeatCallable(amNodeInfo)); } } catch (InterruptedException e) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 989aad0..c142982 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -21,7 +21,10 @@ import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; @@ -39,6 +42,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; @@ -62,8 +67,11 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; +import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult; @@ -94,6 +102,9 @@ private final LlapDaemonExecutorMetrics metrics; private final Configuration conf; private final ConfParams confParams; + + // Map of dagId to vertices and associated state. + private final ConcurrentMap> sourceCompletionMap = new ConcurrentHashMap<>(); // TODO Support for removing queued containers, interrupting / killing specific containers public ContainerRunnerImpl(Configuration conf, int numExecutors, String[] localDirsBase, int localShufflePort, @@ -141,7 +152,8 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, String[] localD @Override public void serviceStart() { // The node id will only be available at this point, since the server has been started in LlapDaemon - LlapNodeId llapNodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort()); + LlapNodeId llapNodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), + localAddress.get().getPort()); this.amReporter = new AMReporter(llapNodeId, conf); amReporter.init(conf); amReporter.start(); @@ -172,7 +184,7 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber()); - LOG.info("Queueing container for execution: " + stringifyRequest(request)); + LOG.info("Queueing container for execution: " + stringifySubmitRequest(request)); // This is the start of container-annotated logging. // TODO Reduce the length of this string. Way too verbose at the moment. String ndcContextString = @@ -215,9 +227,10 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { LOG.info("DEBUG: Registering request with the ShuffleHandler"); ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser(), localDirs); + ConcurrentMap sourceCompletionMap = getSourceCompletionMap(request.getFragmentSpec().getDagName()); TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()), new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, - credentials, memoryPerExecutor, amReporter, confParams); + credentials, memoryPerExecutor, amReporter, sourceCompletionMap, confParams); ListenableFuture future = executorService.submit(callable); Futures.addCallback(future, new TaskRunnerCallback(request, callable)); metrics.incrExecutorTotalRequestsHandled(); @@ -227,6 +240,13 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { } } + @Override + public void sourceStateUpdated(SourceStateUpdatedRequestProto request) { + LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request)); + ConcurrentMap dagMap = getSourceCompletionMap(request.getDagName()); + dagMap.put(request.getSrcName(), request.getState()); + } + static class TaskRunnerCallable extends CallableWithNdc { private final SubmitWorkRequestProto request; @@ -241,6 +261,8 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { private final ConfParams confParams; private final Token jobToken; private final AMReporter amReporter; + private final ConcurrentMap sourceCompletionMap; + private final TaskSpec taskSpec; private volatile TezTaskRunner taskRunner; private volatile TaskReporterInterface taskReporter; private volatile ListeningExecutorService executor; @@ -254,17 +276,20 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { TaskRunnerCallable(SubmitWorkRequestProto request, Configuration conf, ExecutionContext executionContext, Map envMap, String[] localDirs, Credentials credentials, - long memoryAvailable, AMReporter amReporter, ConfParams confParams) { + long memoryAvailable, AMReporter amReporter, + ConcurrentMap sourceCompletionMap, ConfParams confParams) { this.request = request; this.conf = conf; this.executionContext = executionContext; this.envMap = envMap; this.localDirs = localDirs; this.objectRegistry = new ObjectRegistryImpl(); + this.sourceCompletionMap = sourceCompletionMap; this.credentials = credentials; this.memoryAvailable = memoryAvailable; this.confParams = confParams; - jobToken = TokenCache.getSessionToken(credentials); + this.jobToken = TokenCache.getSessionToken(credentials); + this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec()); this.amReporter = amReporter; // Register with the AMReporter when the callable is setup. Unregister once it starts running. this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken); @@ -274,6 +299,10 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { protected ContainerExecutionResult callInternal() throws Exception { this.startTime = System.currentTimeMillis(); this.threadName = Thread.currentThread().getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); + } + // Unregister from the AMReporter, since the task is now running. this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); @@ -320,7 +349,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { request.getContainerIdString()); taskRunner = new TezTaskRunner(conf, taskUgi, localDirs, - Converters.getTaskSpecfromProto(request.getFragmentSpec()), + taskSpec, request.getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, @@ -346,10 +375,51 @@ public LlapTaskUmbilicalProtocol run() throws Exception { } LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + sw.stop().elapsedMillis()); + if (LOG.isDebugEnabled()) { + LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); + } + return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, null); } + /** + * Check whether a task can run to completion or may end up blocking on it's sources. + * This currently happens via looking up source state. + * TODO: Eventually, this should lookup the Hive Processor to figure out whether + * it's reached a state where it can finish - especially in cases of failures + * after data has been fetched. + * @return + */ + public boolean canFinish() { + List inputSpecList = taskSpec.getInputs(); + boolean canFinish = true; + if (inputSpecList != null && !inputSpecList.isEmpty()) { + for (InputSpec inputSpec : inputSpecList) { + if (isSourceOfInterest(inputSpec)) { + // Lookup the state in the map. + SourceStateProto state = sourceCompletionMap.get(inputSpec.getSourceVertexName()); + if (state != null && state == SourceStateProto.S_SUCCEEDED) { + continue; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot finish due to source: " + inputSpec.getSourceVertexName()); + } + canFinish = false; + break; + } + } + } + } + return canFinish; + } + + private boolean isSourceOfInterest(InputSpec inputSpec) { + String inputClassName = inputSpec.getInputDescriptor().getClassName(); + // MRInput is not of interest since it'll always be ready. + return !inputClassName.equals(MRInputLegacy.class.getName()); + } + public void shutdown() { executor.shutdownNow(); if (taskReporter != null) { @@ -444,7 +514,15 @@ public ConfParams(int amHeartbeatIntervalMsMax, long amCounterHeartbeatInterval, } } - private String stringifyRequest(SubmitWorkRequestProto request) { + private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) { + StringBuilder sb = new StringBuilder(); + sb.append("dagName=").append(request.getDagName()) + .append(", ").append("sourceName=").append(request.getSrcName()) + .append(", ").append("state=").append(request.getState()); + return sb.toString(); + } + + private String stringifySubmitRequest(SubmitWorkRequestProto request) { StringBuilder sb = new StringBuilder(); sb.append("am_details=").append(request.getAmHost()).append(":").append(request.getAmPort()); sb.append(", user=").append(request.getUser()); @@ -462,25 +540,40 @@ private String stringifyRequest(SubmitWorkRequestProto request) { sb.append(", Inputs={"); if (fragmentSpec.getInputSpecsCount() > 0) { for (IOSpecProto ioSpec : fragmentSpec.getInputSpecsList()) { - sb.append("{").append(ioSpec.getConnectedVertexName()).append(",").append(ioSpec.getIoDescriptor().getClassName()).append(",").append(ioSpec.getPhysicalEdgeCount()).append("}"); + sb.append("{").append(ioSpec.getConnectedVertexName()).append(",") + .append(ioSpec.getIoDescriptor().getClassName()).append(",") + .append(ioSpec.getPhysicalEdgeCount()).append("}"); } } sb.append("}"); sb.append(", Outputs={"); if (fragmentSpec.getOutputSpecsCount() > 0) { for (IOSpecProto ioSpec : fragmentSpec.getOutputSpecsList()) { - sb.append("{").append(ioSpec.getConnectedVertexName()).append(",").append(ioSpec.getIoDescriptor().getClassName()).append(",").append(ioSpec.getPhysicalEdgeCount()).append("}"); + sb.append("{").append(ioSpec.getConnectedVertexName()).append(",") + .append(ioSpec.getIoDescriptor().getClassName()).append(",") + .append(ioSpec.getPhysicalEdgeCount()).append("}"); } } sb.append("}"); sb.append(", GroupedInputs={"); if (fragmentSpec.getGroupedInputSpecsCount() > 0) { for (GroupInputSpecProto group : fragmentSpec.getGroupedInputSpecsList()) { - sb.append("{").append("groupName=").append(group.getGroupName()).append(", elements=").append(group.getGroupVerticesList()).append("}"); + sb.append("{").append("groupName=").append(group.getGroupName()).append(", elements=") + .append(group.getGroupVerticesList()).append("}"); sb.append(group.getGroupVerticesList()); } } sb.append("}"); return sb.toString(); } + + private ConcurrentMap getSourceCompletionMap(String dagName) { + ConcurrentMap dagMap = sourceCompletionMap.get(dagName); + if (dagMap == null) { + dagMap = new ConcurrentHashMap<>(); + ConcurrentMap old = sourceCompletionMap.putIfAbsent(dagName, dagMap); + dagMap = (old != null) ? old : dagMap; + } + return dagMap; + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index cea7692..eb8d64b 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -242,6 +242,11 @@ public void submitWork(LlapDaemonProtocolProtos.SubmitWorkRequestProto request) containerRunner.submitWork(request); } + @Override + public void sourceStateUpdated(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request) { + containerRunner.sourceStateUpdated(request); + } + @VisibleForTesting public long getNumSubmissions() { return numSubmissions.get(); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java index ae74ae8..01b53c2 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java @@ -20,7 +20,10 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -42,8 +45,8 @@ public LlapDaemonProtocolClientImpl(Configuration conf, String hostname, int por } @Override - public LlapDaemonProtocolProtos.SubmitWorkResponseProto submitWork(RpcController controller, - LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws + public SubmitWorkResponseProto submitWork(RpcController controller, + SubmitWorkRequestProto request) throws ServiceException { try { return getProxy().submitWork(null, request); @@ -52,6 +55,17 @@ public LlapDaemonProtocolClientImpl(Configuration conf, String hostname, int por } } + @Override + public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller, + SourceStateUpdatedRequestProto request) throws + ServiceException { + try { + return getProxy().sourceStateUpdated(null, request); + } catch (IOException e) { + throw new ServiceException(e); + } + } + public LlapDaemonProtocolBlockingPB getProxy() throws IOException { if (proxy == null) { proxy = createProxy(); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java index 5ea11fd..0360a27 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java @@ -27,6 +27,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; @@ -62,7 +65,7 @@ public LlapDaemonProtocolServerImpl(int numHandlers, @Override public SubmitWorkResponseProto submitWork(RpcController controller, - LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws + SubmitWorkRequestProto request) throws ServiceException { try { containerRunner.submitWork(request); @@ -73,6 +76,14 @@ public SubmitWorkResponseProto submitWork(RpcController controller, } @Override + public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller, + SourceStateUpdatedRequestProto request) throws + ServiceException { + containerRunner.sourceStateUpdated(request); + return SourceStateUpdatedResponseProto.getDefaultInstance(); + } + + @Override public void serviceStart() { Configuration conf = getConfig(); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java index 7e06f2b..5bd1fe9 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.dag.api.EntityDescriptor; @@ -31,6 +32,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.apache.tez.runtime.api.impl.InputSpec; @@ -250,4 +252,15 @@ private static UserPayload convertPayloadFromProto( return userPayload; } + public static SourceStateProto fromVertexState(VertexState state) { + switch (state) { + case SUCCEEDED: + return SourceStateProto.S_SUCCEEDED; + case RUNNING: + return SourceStateProto.S_RUNNING; + default: + throw new RuntimeException("Unexpected state: " + state); + } + } + } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 0677de1..e1610fe 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -31,9 +31,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolSignature; @@ -51,6 +54,7 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -64,11 +68,16 @@ private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST; private final ConcurrentMap credentialMap; + // Tracks containerIds and taskAttemptIds, so can be kept independent of the running DAG. + // When DAG specific cleanup happens, it'll be better to link this to a DAG though. private final EntityTracker entityTracker = new EntityTracker(); + private final SourceStateTracker sourceStateTracker; private TaskCommunicator communicator; private final LlapTaskUmbilicalProtocol umbilical; + private volatile String currentDagName; + public LlapTaskCommunicator( TaskCommunicatorContext taskCommunicatorContext) { super(taskCommunicatorContext); @@ -87,6 +96,7 @@ public LlapTaskCommunicator( BASE_SUBMIT_WORK_REQUEST = baseBuilder.build(); credentialMap = new ConcurrentHashMap<>(); + sourceStateTracker = new SourceStateTracker(getTaskCommunicatorContext(), this); } @Override @@ -153,6 +163,7 @@ public void registerContainerEnd(ContainerId containerId) { entityTracker.unregisterContainer(containerId); } + @Override public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec, Map additionalResources, @@ -161,6 +172,9 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task int priority) { super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority); + if (taskSpec.getDAGName() != currentDagName) { + resetCurrentDag(taskSpec.getDAGName()); + } SubmitWorkRequestProto requestProto; try { @@ -184,6 +198,8 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task entityTracker.registerTaskAttempt(containerId, taskSpec.getTaskAttemptID(), host, port); + sourceStateTracker.addTask(host, port, taskSpec.getInputs()); + // Have to register this up front right now. Otherwise, it's possible for the task to start // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them. getTaskCommunicatorContext() @@ -240,6 +256,45 @@ public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) { // be told that it needs to die since it isn't recognized. } + @Override + public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) { + // Delegate updates over to the source state tracker. + sourceStateTracker + .sourceStateUpdated(vertexStateUpdate.getVertexName(), vertexStateUpdate.getVertexState()); + } + + public void sendStateUpdate(final String host, final int port, + final SourceStateUpdatedRequestProto request) { + communicator.sendSourceStateUpdate(request, host, port, + new TaskCommunicator.ExecuteRequestCallback() { + @Override + public void setResponse(SourceStateUpdatedResponseProto response) { + } + + @Override + public void indicateError(Throwable t) { + // TODO HIVE-10280. + // Ideally, this should be retried for a while, after which the node should be marked as failed. + // Considering tasks are supposed to run fast. Failing the task immediately may be a good option. + LOG.error( + "Failed to send state update to node: " + host + ":" + port + ", StateUpdate=" + + request, t); + } + }); + } + + + + private void resetCurrentDag(String newDagName) { + // Working on the assumption that a single DAG runs at a time per AM. + currentDagName = newDagName; + sourceStateTracker.resetState(newDagName); + LOG.info("CurrentDag set to: " + newDagName); + // TODO Additional state reset. Potentially sending messages to node to reset. + // Is it possible for heartbeats to come in from lost tasks - those should be told to die, which + // is likely already happening. + } + private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId, TaskSpec taskSpec) throws IOException { @@ -411,4 +466,4 @@ void nodePinged(String hostname, int port) { } } } -} +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java index 1670a48..3b4612d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java @@ -27,14 +27,20 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.Message; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.service.AbstractService; public class TaskCommunicator extends AbstractService { + private static final Log LOG = LogFactory.getLog(TaskCommunicator.class); + private final ConcurrentMap hostProxies; private ListeningExecutorService executor; @@ -53,7 +59,7 @@ public void serviceStop() { public void submitWork(SubmitWorkRequestProto request, String host, int port, final ExecuteRequestCallback callback) { - ListenableFuture future = executor.submit(new SubmitWorkCallable(request, host, port)); + ListenableFuture future = executor.submit(new SubmitWorkCallable(host, port, request)); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(SubmitWorkResponseProto result) { @@ -68,23 +74,66 @@ public void onFailure(Throwable t) { } - private class SubmitWorkCallable implements Callable { + public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String host, final int port, + final ExecuteRequestCallback callback) { + ListenableFuture future = + executor.submit(new SendSourceStateUpdateCallable(host, port, request)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(SourceStateUpdatedResponseProto result) { + callback.setResponse(result); + } + + @Override + public void onFailure(Throwable t) { + callback.indicateError(t); + } + }); + } + + private static abstract class CallableRequest implements Callable { + final String hostname; final int port; - final SubmitWorkRequestProto request; + final REQUEST request; - private SubmitWorkCallable(SubmitWorkRequestProto request, String hostname, int port) { + + protected CallableRequest(String hostname, int port, REQUEST request) { this.hostname = hostname; this.port = port; this.request = request; } + public abstract RESPONSE call() throws Exception; + } + + private class SubmitWorkCallable extends CallableRequest { + + protected SubmitWorkCallable(String hostname, int port, + SubmitWorkRequestProto submitWorkRequestProto) { + super(hostname, port, submitWorkRequestProto); + } + @Override public SubmitWorkResponseProto call() throws Exception { return getProxy(hostname, port).submitWork(null, request); } } + private class SendSourceStateUpdateCallable + extends CallableRequest { + + public SendSourceStateUpdateCallable(String hostname, int port, + SourceStateUpdatedRequestProto request) { + super(hostname, port, request); + } + + @Override + public SourceStateUpdatedResponseProto call() throws Exception { + return getProxy(hostname, port).sourceStateUpdated(null, request); + } + } + public interface ExecuteRequestCallback { void setResponse(T response); void indicateError(Throwable t); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java new file mode 100644 index 0000000..698de76 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java @@ -0,0 +1,212 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.helpers; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.llap.LlapNodeId; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.tezplugins.Converters; +import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; +import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.runtime.api.impl.InputSpec; + +public class SourceStateTracker { + + private static final Log LOG = LogFactory.getLog(SourceStateTracker.class); + + private final TaskCommunicatorContext taskCommunicatorContext; + private final LlapTaskCommunicator taskCommunicator; + + // Tracks vertices for which notifications have been registered + private final Set notificationRegisteredVertices = new HashSet<>(); + + private final Map sourceInfoMap = new HashMap<>(); + private final Map nodeInfoMap = new HashMap<>(); + + private volatile String currentDagName; + + + public SourceStateTracker(TaskCommunicatorContext taskCommunicatorContext, + LlapTaskCommunicator taskCommunicator) { + this.taskCommunicatorContext = taskCommunicatorContext; + this.taskCommunicator = taskCommunicator; + } + + /** + * To be invoked after each DAG completes. + */ + public synchronized void resetState(String newDagName) { + sourceInfoMap.clear(); + nodeInfoMap.clear(); + notificationRegisteredVertices.clear(); + this.currentDagName = newDagName; + } + + public synchronized void addTask(String host, int port, List inputSpecList) { + + // Add tracking information. Check if source state already known and send out an update if it is. + + List sourcesOfInterest = getSourceInterestList(inputSpecList); + if (sourcesOfInterest != null && !sourcesOfInterest.isEmpty()) { + LlapNodeId nodeId = LlapNodeId.getInstance(host, port); + NodeInfo nodeInfo = getNodeInfo(nodeId); + + // Set up the data structures, before any notifications come in. + for (String src : sourcesOfInterest) { + VertexState oldStateForNode = nodeInfo.getLastKnownStateForSource(src); + if (oldStateForNode == null) { + // Not registered for this node. + // Register and send state if it is successful. + SourceInfo srcInfo = getSourceInfo(src); + srcInfo.addNode(nodeId); + + nodeInfo.addSource(src, srcInfo.lastKnownState); + if (srcInfo.lastKnownState == VertexState.SUCCEEDED) { + sendStateUpdateToNode(nodeId, src, srcInfo.lastKnownState); + } + + } else { + // Already registered to send updates to this node for the specific source. + // Nothing to do for now, unless tracking tasks at a later point. + } + + // Setup for actual notifications, if not already done for a previous task. + maybeRegisterForVertexUpdates(src); + } + } else { + // Don't need to track anything for this task. No new notifications, etc. + } + } + + public synchronized void sourceStateUpdated(String sourceName, VertexState sourceState) { + SourceInfo sourceInfo = getSourceInfo(sourceName); + sourceInfo.lastKnownState = sourceState; + // Checking state per node for future failure handling scenarios, where an update + // to a single node may fail. + for (LlapNodeId nodeId : sourceInfo.getInterestedNodes()) { + NodeInfo nodeInfo = nodeInfoMap.get(nodeId); + VertexState lastStateForNode = nodeInfo.getLastKnownStateForSource(sourceName); + // Send only if the state has changed. + if (lastStateForNode != sourceState) { + nodeInfo.setLastKnownStateForSource(sourceName, sourceState); + sendStateUpdateToNode(nodeId, sourceName, sourceState); + } + } + } + + + private static class SourceInfo { + private final List interestedNodes = new LinkedList<>(); + // Always start in the running state. Requests for state updates will be sent out after registration. + private VertexState lastKnownState = VertexState.RUNNING; + + void addNode(LlapNodeId nodeId) { + interestedNodes.add(nodeId); + } + + List getInterestedNodes() { + return this.interestedNodes; + } + } + + private synchronized SourceInfo getSourceInfo(String srcName) { + SourceInfo sourceInfo = sourceInfoMap.get(srcName); + if (sourceInfo == null) { + sourceInfo = new SourceInfo(); + sourceInfoMap.put(srcName, sourceInfo); + } + return sourceInfo; + } + + + private static class NodeInfo { + private final Map sourcesOfInterest = new HashMap<>(); + + void addSource(String srcName, VertexState sourceState) { + sourcesOfInterest.put(srcName, sourceState); + } + + VertexState getLastKnownStateForSource(String src) { + return sourcesOfInterest.get(src); + } + + void setLastKnownStateForSource(String src, VertexState state) { + sourcesOfInterest.put(src, state); + } + } + + private synchronized NodeInfo getNodeInfo(LlapNodeId llapNodeId) { + NodeInfo nodeInfo = nodeInfoMap.get(llapNodeId); + if (nodeInfo == null) { + nodeInfo = new NodeInfo(); + nodeInfoMap.put(llapNodeId, nodeInfo); + } + return nodeInfo; + } + + + private List getSourceInterestList(List inputSpecList) { + List sourcesOfInterest = Collections.emptyList(); + if (inputSpecList != null) { + boolean alreadyFound = false; + for (InputSpec inputSpec : inputSpecList) { + if (isSourceOfInterest(inputSpec)) { + if (!alreadyFound) { + alreadyFound = true; + sourcesOfInterest = new LinkedList<>(); + } + sourcesOfInterest.add(inputSpec.getSourceVertexName()); + } + } + } + return sourcesOfInterest; + } + + + private void maybeRegisterForVertexUpdates(String sourceName) { + if (!notificationRegisteredVertices.contains(sourceName)) { + notificationRegisteredVertices.add(sourceName); + taskCommunicatorContext.registerForVertexStateUpdates(sourceName, EnumSet.of( + VertexState.RUNNING, VertexState.SUCCEEDED)); + } + } + + private boolean isSourceOfInterest(InputSpec inputSpec) { + String inputClassName = inputSpec.getInputDescriptor().getClassName(); + // MRInput is not of interest since it'll always be ready. + return !inputClassName.equals(MRInputLegacy.class.getName()); + } + + void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) { + taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(), + SourceStateUpdatedRequestProto.newBuilder().setDagName(currentDagName).setSrcName( + sourceName) + .setState(Converters.fromVertexState(state)).build()); + } + + +} diff --git llap-server/src/protobuf/LlapDaemonProtocol.proto llap-server/src/protobuf/LlapDaemonProtocol.proto index 4490828..654a155 100644 --- llap-server/src/protobuf/LlapDaemonProtocol.proto +++ llap-server/src/protobuf/LlapDaemonProtocol.proto @@ -60,6 +60,11 @@ message FragmentSpecProto { optional int32 attempt_number = 10; } +enum SourceStateProto { + S_SUCCEEDED = 1; + S_RUNNING = 2; +} + message SubmitWorkRequestProto { optional string container_id_string = 1; optional string am_host = 2; @@ -75,6 +80,16 @@ message SubmitWorkRequestProto { message SubmitWorkResponseProto { } +message SourceStateUpdatedRequestProto { + optional string dag_name = 1; + optional string src_name = 2; + optional SourceStateProto state = 3; +} + +message SourceStateUpdatedResponseProto { +} + service LlapDaemonProtocol { rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto); + rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto); }