diff --git llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index d378955..b044df9 100644 --- llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -3124,20 +3124,20 @@ public Builder clearMergedInputDescriptor() { public interface FragmentSpecProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { - // optional string task_attempt_id_string = 1; + // optional string fragment_identifier_string = 1; /** - * optional string task_attempt_id_string = 1; + * optional string fragment_identifier_string = 1; */ - boolean hasTaskAttemptIdString(); + boolean hasFragmentIdentifierString(); /** - * optional string task_attempt_id_string = 1; + * optional string fragment_identifier_string = 1; */ - java.lang.String getTaskAttemptIdString(); + java.lang.String getFragmentIdentifierString(); /** - * optional string task_attempt_id_string = 1; + * optional string fragment_identifier_string = 1; */ com.google.protobuf.ByteString - getTaskAttemptIdStringBytes(); + getFragmentIdentifierStringBytes(); // optional string dag_name = 2; /** @@ -3341,7 +3341,7 @@ private FragmentSpecProto( } case 10: { bitField0_ |= 0x00000001; - taskAttemptIdString_ = input.readBytes(); + fragmentIdentifierString_ = input.readBytes(); break; } case 18: { @@ -3455,20 +3455,20 @@ public FragmentSpecProto parsePartialFrom( } private int bitField0_; - // optional string task_attempt_id_string = 1; - public static final int TASK_ATTEMPT_ID_STRING_FIELD_NUMBER = 1; - private java.lang.Object taskAttemptIdString_; + // optional string fragment_identifier_string = 1; + public static final int FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER = 1; + private java.lang.Object fragmentIdentifierString_; /** - * optional string task_attempt_id_string = 1; + * optional string fragment_identifier_string = 1; */ - public boolean hasTaskAttemptIdString() { + public boolean hasFragmentIdentifierString() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * optional string task_attempt_id_string = 1; + * optional string fragment_identifier_string = 1; */ - public java.lang.String getTaskAttemptIdString() { - java.lang.Object ref = taskAttemptIdString_; + public java.lang.String getFragmentIdentifierString() { + java.lang.Object ref = fragmentIdentifierString_; if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { @@ -3476,22 +3476,22 @@ public boolean hasTaskAttemptIdString() { (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { - taskAttemptIdString_ = s; + fragmentIdentifierString_ = s; } return s; } } /** - * optional string task_attempt_id_string = 1; + * optional string fragment_identifier_string = 1; */ public com.google.protobuf.ByteString - getTaskAttemptIdStringBytes() { - java.lang.Object ref = taskAttemptIdString_; + getFragmentIdentifierStringBytes() { + java.lang.Object ref = fragmentIdentifierString_; if (ref instanceof java.lang.String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - taskAttemptIdString_ = b; + fragmentIdentifierString_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; @@ -3763,7 +3763,7 @@ public int getAttemptNumber() { } private void initFields() { - taskAttemptIdString_ = ""; + fragmentIdentifierString_ = ""; dagName_ = ""; vertexName_ = ""; processorDescriptor_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto.getDefaultInstance(); @@ -3787,7 +3787,7 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeBytes(1, getTaskAttemptIdStringBytes()); + output.writeBytes(1, getFragmentIdentifierStringBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getDagNameBytes()); @@ -3827,7 +3827,7 @@ public int getSerializedSize() { size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(1, getTaskAttemptIdStringBytes()); + .computeBytesSize(1, getFragmentIdentifierStringBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream @@ -3888,10 +3888,10 @@ public boolean equals(final java.lang.Object obj) { org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto) obj; boolean result = true; - result = result && (hasTaskAttemptIdString() == other.hasTaskAttemptIdString()); - if (hasTaskAttemptIdString()) { - result = result && getTaskAttemptIdString() - .equals(other.getTaskAttemptIdString()); + result = result && (hasFragmentIdentifierString() == other.hasFragmentIdentifierString()); + if (hasFragmentIdentifierString()) { + result = result && getFragmentIdentifierString() + .equals(other.getFragmentIdentifierString()); } result = result && (hasDagName() == other.hasDagName()); if (hasDagName()) { @@ -3942,9 +3942,9 @@ public int hashCode() { } int hash = 41; hash = (19 * hash) + getDescriptorForType().hashCode(); - if (hasTaskAttemptIdString()) { - hash = (37 * hash) + TASK_ATTEMPT_ID_STRING_FIELD_NUMBER; - hash = (53 * hash) + getTaskAttemptIdString().hashCode(); + if (hasFragmentIdentifierString()) { + hash = (37 * hash) + FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER; + hash = (53 * hash) + getFragmentIdentifierString().hashCode(); } if (hasDagName()) { hash = (37 * hash) + DAG_NAME_FIELD_NUMBER; @@ -4095,7 +4095,7 @@ private static Builder create() { public Builder clear() { super.clear(); - taskAttemptIdString_ = ""; + fragmentIdentifierString_ = ""; bitField0_ = (bitField0_ & ~0x00000001); dagName_ = ""; bitField0_ = (bitField0_ & ~0x00000002); @@ -4162,7 +4162,7 @@ public Builder clone() { if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } - result.taskAttemptIdString_ = taskAttemptIdString_; + result.fragmentIdentifierString_ = fragmentIdentifierString_; if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } @@ -4234,9 +4234,9 @@ public Builder mergeFrom(com.google.protobuf.Message other) { public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto other) { if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto.getDefaultInstance()) return this; - if (other.hasTaskAttemptIdString()) { + if (other.hasFragmentIdentifierString()) { bitField0_ |= 0x00000001; - taskAttemptIdString_ = other.taskAttemptIdString_; + fragmentIdentifierString_ = other.fragmentIdentifierString_; onChanged(); } if (other.hasDagName()) { @@ -4366,76 +4366,76 @@ public Builder mergeFrom( } private int bitField0_; - // optional string task_attempt_id_string = 1; - private java.lang.Object taskAttemptIdString_ = ""; + // optional string fragment_identifier_string = 1; + private java.lang.Object fragmentIdentifierString_ = ""; /** - * optional string task_attempt_id_string = 1; + * optional string fragment_identifier_string = 1; */ - public boolean hasTaskAttemptIdString() { + public boolean hasFragmentIdentifierString() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * optional string task_attempt_id_string = 1; + * optional string fragment_identifier_string = 1; */ - public java.lang.String getTaskAttemptIdString() { - java.lang.Object ref = taskAttemptIdString_; + public java.lang.String getFragmentIdentifierString() { + java.lang.Object ref = fragmentIdentifierString_; if (!(ref instanceof java.lang.String)) { java.lang.String s = ((com.google.protobuf.ByteString) ref) .toStringUtf8(); - taskAttemptIdString_ = s; + fragmentIdentifierString_ = s; return s; } else { return (java.lang.String) ref; } } /** - * optional string task_attempt_id_string = 1; + * optional string fragment_identifier_string = 1; */ public com.google.protobuf.ByteString - getTaskAttemptIdStringBytes() { - java.lang.Object ref = taskAttemptIdString_; + getFragmentIdentifierStringBytes() { + java.lang.Object ref = fragmentIdentifierString_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - taskAttemptIdString_ = b; + fragmentIdentifierString_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** - * optional string task_attempt_id_string = 1; + * optional string fragment_identifier_string = 1; */ - public Builder setTaskAttemptIdString( + public Builder setFragmentIdentifierString( java.lang.String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000001; - taskAttemptIdString_ = value; + fragmentIdentifierString_ = value; onChanged(); return this; } /** - * optional string task_attempt_id_string = 1; + * optional string fragment_identifier_string = 1; */ - public Builder clearTaskAttemptIdString() { + public Builder clearFragmentIdentifierString() { bitField0_ = (bitField0_ & ~0x00000001); - taskAttemptIdString_ = getDefaultInstance().getTaskAttemptIdString(); + fragmentIdentifierString_ = getDefaultInstance().getFragmentIdentifierString(); onChanged(); return this; } /** - * optional string task_attempt_id_string = 1; + * optional string fragment_identifier_string = 1; */ - public Builder setTaskAttemptIdStringBytes( + public Builder setFragmentIdentifierStringBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000001; - taskAttemptIdString_ = value; + fragmentIdentifierString_ = value; onChanged(); return this; } @@ -10847,50 +10847,20 @@ public Builder mergeFrom( com.google.protobuf.ByteString getDagNameBytes(); - // optional int32 dag_attempt_number = 3; + // optional string fragment_identifier_string = 7; /** - * optional int32 dag_attempt_number = 3; + * optional string fragment_identifier_string = 7; */ - boolean hasDagAttemptNumber(); + boolean hasFragmentIdentifierString(); /** - * optional int32 dag_attempt_number = 3; + * optional string fragment_identifier_string = 7; */ - int getDagAttemptNumber(); - - // optional string vertex_name = 4; - /** - * optional string vertex_name = 4; - */ - boolean hasVertexName(); + java.lang.String getFragmentIdentifierString(); /** - * optional string vertex_name = 4; - */ - java.lang.String getVertexName(); - /** - * optional string vertex_name = 4; + * optional string fragment_identifier_string = 7; */ com.google.protobuf.ByteString - getVertexNameBytes(); - - // optional int32 fragment_number = 5; - /** - * optional int32 fragment_number = 5; - */ - boolean hasFragmentNumber(); - /** - * optional int32 fragment_number = 5; - */ - int getFragmentNumber(); - - // optional int32 attempt_number = 6; - /** - * optional int32 attempt_number = 6; - */ - boolean hasAttemptNumber(); - /** - * optional int32 attempt_number = 6; - */ - int getAttemptNumber(); + getFragmentIdentifierStringBytes(); } /** * Protobuf type {@code TerminateFragmentRequestProto} @@ -10953,24 +10923,9 @@ private TerminateFragmentRequestProto( dagName_ = input.readBytes(); break; } - case 24: { + case 58: { bitField0_ |= 0x00000004; - dagAttemptNumber_ = input.readInt32(); - break; - } - case 34: { - bitField0_ |= 0x00000008; - vertexName_ = input.readBytes(); - break; - } - case 40: { - bitField0_ |= 0x00000010; - fragmentNumber_ = input.readInt32(); - break; - } - case 48: { - bitField0_ |= 0x00000020; - attemptNumber_ = input.readInt32(); + fragmentIdentifierString_ = input.readBytes(); break; } } @@ -11099,36 +11054,20 @@ public boolean hasDagName() { } } - // optional int32 dag_attempt_number = 3; - public static final int DAG_ATTEMPT_NUMBER_FIELD_NUMBER = 3; - private int dagAttemptNumber_; + // optional string fragment_identifier_string = 7; + public static final int FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER = 7; + private java.lang.Object fragmentIdentifierString_; /** - * optional int32 dag_attempt_number = 3; + * optional string fragment_identifier_string = 7; */ - public boolean hasDagAttemptNumber() { + public boolean hasFragmentIdentifierString() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * optional int32 dag_attempt_number = 3; - */ - public int getDagAttemptNumber() { - return dagAttemptNumber_; - } - - // optional string vertex_name = 4; - public static final int VERTEX_NAME_FIELD_NUMBER = 4; - private java.lang.Object vertexName_; - /** - * optional string vertex_name = 4; + * optional string fragment_identifier_string = 7; */ - public boolean hasVertexName() { - return ((bitField0_ & 0x00000008) == 0x00000008); - } - /** - * optional string vertex_name = 4; - */ - public java.lang.String getVertexName() { - java.lang.Object ref = vertexName_; + public java.lang.String getFragmentIdentifierString() { + java.lang.Object ref = fragmentIdentifierString_; if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { @@ -11136,67 +11075,32 @@ public boolean hasVertexName() { (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { - vertexName_ = s; + fragmentIdentifierString_ = s; } return s; } } /** - * optional string vertex_name = 4; + * optional string fragment_identifier_string = 7; */ public com.google.protobuf.ByteString - getVertexNameBytes() { - java.lang.Object ref = vertexName_; + getFragmentIdentifierStringBytes() { + java.lang.Object ref = fragmentIdentifierString_; if (ref instanceof java.lang.String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - vertexName_ = b; + fragmentIdentifierString_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } - // optional int32 fragment_number = 5; - public static final int FRAGMENT_NUMBER_FIELD_NUMBER = 5; - private int fragmentNumber_; - /** - * optional int32 fragment_number = 5; - */ - public boolean hasFragmentNumber() { - return ((bitField0_ & 0x00000010) == 0x00000010); - } - /** - * optional int32 fragment_number = 5; - */ - public int getFragmentNumber() { - return fragmentNumber_; - } - - // optional int32 attempt_number = 6; - public static final int ATTEMPT_NUMBER_FIELD_NUMBER = 6; - private int attemptNumber_; - /** - * optional int32 attempt_number = 6; - */ - public boolean hasAttemptNumber() { - return ((bitField0_ & 0x00000020) == 0x00000020); - } - /** - * optional int32 attempt_number = 6; - */ - public int getAttemptNumber() { - return attemptNumber_; - } - private void initFields() { queryId_ = ""; dagName_ = ""; - dagAttemptNumber_ = 0; - vertexName_ = ""; - fragmentNumber_ = 0; - attemptNumber_ = 0; + fragmentIdentifierString_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -11217,16 +11121,7 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) output.writeBytes(2, getDagNameBytes()); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeInt32(3, dagAttemptNumber_); - } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - output.writeBytes(4, getVertexNameBytes()); - } - if (((bitField0_ & 0x00000010) == 0x00000010)) { - output.writeInt32(5, fragmentNumber_); - } - if (((bitField0_ & 0x00000020) == 0x00000020)) { - output.writeInt32(6, attemptNumber_); + output.writeBytes(7, getFragmentIdentifierStringBytes()); } getUnknownFields().writeTo(output); } @@ -11247,19 +11142,7 @@ public int getSerializedSize() { } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream - .computeInt32Size(3, dagAttemptNumber_); - } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(4, getVertexNameBytes()); - } - if (((bitField0_ & 0x00000010) == 0x00000010)) { - size += com.google.protobuf.CodedOutputStream - .computeInt32Size(5, fragmentNumber_); - } - if (((bitField0_ & 0x00000020) == 0x00000020)) { - size += com.google.protobuf.CodedOutputStream - .computeInt32Size(6, attemptNumber_); + .computeBytesSize(7, getFragmentIdentifierStringBytes()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -11294,25 +11177,10 @@ public boolean equals(final java.lang.Object obj) { result = result && getDagName() .equals(other.getDagName()); } - result = result && (hasDagAttemptNumber() == other.hasDagAttemptNumber()); - if (hasDagAttemptNumber()) { - result = result && (getDagAttemptNumber() - == other.getDagAttemptNumber()); - } - result = result && (hasVertexName() == other.hasVertexName()); - if (hasVertexName()) { - result = result && getVertexName() - .equals(other.getVertexName()); - } - result = result && (hasFragmentNumber() == other.hasFragmentNumber()); - if (hasFragmentNumber()) { - result = result && (getFragmentNumber() - == other.getFragmentNumber()); - } - result = result && (hasAttemptNumber() == other.hasAttemptNumber()); - if (hasAttemptNumber()) { - result = result && (getAttemptNumber() - == other.getAttemptNumber()); + result = result && (hasFragmentIdentifierString() == other.hasFragmentIdentifierString()); + if (hasFragmentIdentifierString()) { + result = result && getFragmentIdentifierString() + .equals(other.getFragmentIdentifierString()); } result = result && getUnknownFields().equals(other.getUnknownFields()); @@ -11335,21 +11203,9 @@ public int hashCode() { hash = (37 * hash) + DAG_NAME_FIELD_NUMBER; hash = (53 * hash) + getDagName().hashCode(); } - if (hasDagAttemptNumber()) { - hash = (37 * hash) + DAG_ATTEMPT_NUMBER_FIELD_NUMBER; - hash = (53 * hash) + getDagAttemptNumber(); - } - if (hasVertexName()) { - hash = (37 * hash) + VERTEX_NAME_FIELD_NUMBER; - hash = (53 * hash) + getVertexName().hashCode(); - } - if (hasFragmentNumber()) { - hash = (37 * hash) + FRAGMENT_NUMBER_FIELD_NUMBER; - hash = (53 * hash) + getFragmentNumber(); - } - if (hasAttemptNumber()) { - hash = (37 * hash) + ATTEMPT_NUMBER_FIELD_NUMBER; - hash = (53 * hash) + getAttemptNumber(); + if (hasFragmentIdentifierString()) { + hash = (37 * hash) + FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER; + hash = (53 * hash) + getFragmentIdentifierString().hashCode(); } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; @@ -11464,14 +11320,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000001); dagName_ = ""; bitField0_ = (bitField0_ & ~0x00000002); - dagAttemptNumber_ = 0; + fragmentIdentifierString_ = ""; bitField0_ = (bitField0_ & ~0x00000004); - vertexName_ = ""; - bitField0_ = (bitField0_ & ~0x00000008); - fragmentNumber_ = 0; - bitField0_ = (bitField0_ & ~0x00000010); - attemptNumber_ = 0; - bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -11511,19 +11361,7 @@ public Builder clone() { if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - result.dagAttemptNumber_ = dagAttemptNumber_; - if (((from_bitField0_ & 0x00000008) == 0x00000008)) { - to_bitField0_ |= 0x00000008; - } - result.vertexName_ = vertexName_; - if (((from_bitField0_ & 0x00000010) == 0x00000010)) { - to_bitField0_ |= 0x00000010; - } - result.fragmentNumber_ = fragmentNumber_; - if (((from_bitField0_ & 0x00000020) == 0x00000020)) { - to_bitField0_ |= 0x00000020; - } - result.attemptNumber_ = attemptNumber_; + result.fragmentIdentifierString_ = fragmentIdentifierString_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -11550,20 +11388,11 @@ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtoc dagName_ = other.dagName_; onChanged(); } - if (other.hasDagAttemptNumber()) { - setDagAttemptNumber(other.getDagAttemptNumber()); - } - if (other.hasVertexName()) { - bitField0_ |= 0x00000008; - vertexName_ = other.vertexName_; + if (other.hasFragmentIdentifierString()) { + bitField0_ |= 0x00000004; + fragmentIdentifierString_ = other.fragmentIdentifierString_; onChanged(); } - if (other.hasFragmentNumber()) { - setFragmentNumber(other.getFragmentNumber()); - } - if (other.hasAttemptNumber()) { - setAttemptNumber(other.getAttemptNumber()); - } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -11739,175 +11568,76 @@ public Builder setDagNameBytes( return this; } - // optional int32 dag_attempt_number = 3; - private int dagAttemptNumber_ ; + // optional string fragment_identifier_string = 7; + private java.lang.Object fragmentIdentifierString_ = ""; /** - * optional int32 dag_attempt_number = 3; + * optional string fragment_identifier_string = 7; */ - public boolean hasDagAttemptNumber() { + public boolean hasFragmentIdentifierString() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * optional int32 dag_attempt_number = 3; - */ - public int getDagAttemptNumber() { - return dagAttemptNumber_; - } - /** - * optional int32 dag_attempt_number = 3; + * optional string fragment_identifier_string = 7; */ - public Builder setDagAttemptNumber(int value) { - bitField0_ |= 0x00000004; - dagAttemptNumber_ = value; - onChanged(); - return this; - } - /** - * optional int32 dag_attempt_number = 3; - */ - public Builder clearDagAttemptNumber() { - bitField0_ = (bitField0_ & ~0x00000004); - dagAttemptNumber_ = 0; - onChanged(); - return this; - } - - // optional string vertex_name = 4; - private java.lang.Object vertexName_ = ""; - /** - * optional string vertex_name = 4; - */ - public boolean hasVertexName() { - return ((bitField0_ & 0x00000008) == 0x00000008); - } - /** - * optional string vertex_name = 4; - */ - public java.lang.String getVertexName() { - java.lang.Object ref = vertexName_; + public java.lang.String getFragmentIdentifierString() { + java.lang.Object ref = fragmentIdentifierString_; if (!(ref instanceof java.lang.String)) { java.lang.String s = ((com.google.protobuf.ByteString) ref) .toStringUtf8(); - vertexName_ = s; + fragmentIdentifierString_ = s; return s; } else { return (java.lang.String) ref; } } /** - * optional string vertex_name = 4; + * optional string fragment_identifier_string = 7; */ public com.google.protobuf.ByteString - getVertexNameBytes() { - java.lang.Object ref = vertexName_; + getFragmentIdentifierStringBytes() { + java.lang.Object ref = fragmentIdentifierString_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - vertexName_ = b; + fragmentIdentifierString_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** - * optional string vertex_name = 4; + * optional string fragment_identifier_string = 7; */ - public Builder setVertexName( + public Builder setFragmentIdentifierString( java.lang.String value) { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000008; - vertexName_ = value; + bitField0_ |= 0x00000004; + fragmentIdentifierString_ = value; onChanged(); return this; } /** - * optional string vertex_name = 4; + * optional string fragment_identifier_string = 7; */ - public Builder clearVertexName() { - bitField0_ = (bitField0_ & ~0x00000008); - vertexName_ = getDefaultInstance().getVertexName(); + public Builder clearFragmentIdentifierString() { + bitField0_ = (bitField0_ & ~0x00000004); + fragmentIdentifierString_ = getDefaultInstance().getFragmentIdentifierString(); onChanged(); return this; } /** - * optional string vertex_name = 4; + * optional string fragment_identifier_string = 7; */ - public Builder setVertexNameBytes( + public Builder setFragmentIdentifierStringBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000008; - vertexName_ = value; - onChanged(); - return this; - } - - // optional int32 fragment_number = 5; - private int fragmentNumber_ ; - /** - * optional int32 fragment_number = 5; - */ - public boolean hasFragmentNumber() { - return ((bitField0_ & 0x00000010) == 0x00000010); - } - /** - * optional int32 fragment_number = 5; - */ - public int getFragmentNumber() { - return fragmentNumber_; - } - /** - * optional int32 fragment_number = 5; - */ - public Builder setFragmentNumber(int value) { - bitField0_ |= 0x00000010; - fragmentNumber_ = value; - onChanged(); - return this; - } - /** - * optional int32 fragment_number = 5; - */ - public Builder clearFragmentNumber() { - bitField0_ = (bitField0_ & ~0x00000010); - fragmentNumber_ = 0; - onChanged(); - return this; - } - - // optional int32 attempt_number = 6; - private int attemptNumber_ ; - /** - * optional int32 attempt_number = 6; - */ - public boolean hasAttemptNumber() { - return ((bitField0_ & 0x00000020) == 0x00000020); - } - /** - * optional int32 attempt_number = 6; - */ - public int getAttemptNumber() { - return attemptNumber_; - } - /** - * optional int32 attempt_number = 6; - */ - public Builder setAttemptNumber(int value) { - bitField0_ |= 0x00000020; - attemptNumber_ = value; - onChanged(); - return this; - } - /** - * optional int32 attempt_number = 6; - */ - public Builder clearAttemptNumber() { - bitField0_ = (bitField0_ & ~0x00000020); - attemptNumber_ = 0; + bitField0_ |= 0x00000004; + fragmentIdentifierString_ = value; onChanged(); return this; } @@ -12796,52 +12526,51 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { "roupInputSpecProto\022\022\n\ngroup_name\030\001 \001(\t\022\026" + "\n\016group_vertices\030\002 \003(\t\0227\n\027merged_input_d", "escriptor\030\003 \001(\0132\026.EntityDescriptorProto\"" + - "\327\002\n\021FragmentSpecProto\022\036\n\026task_attempt_id" + - "_string\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t\022\023\n\013verte" + - "x_name\030\003 \001(\t\0224\n\024processor_descriptor\030\004 \001" + - "(\0132\026.EntityDescriptorProto\022!\n\013input_spec" + - "s\030\005 \003(\0132\014.IOSpecProto\022\"\n\014output_specs\030\006 " + - "\003(\0132\014.IOSpecProto\0221\n\023grouped_input_specs" + - "\030\007 \003(\0132\024.GroupInputSpecProto\022\032\n\022vertex_p" + - "arallelism\030\010 \001(\005\022\027\n\017fragment_number\030\t \001(" + - "\005\022\026\n\016attempt_number\030\n \001(\005\"\344\001\n\023FragmentRu", - "ntimeInfo\022#\n\033num_self_and_upstream_tasks" + - "\030\001 \001(\005\022-\n%num_self_and_upstream_complete" + - "d_tasks\030\002 \001(\005\022\033\n\023within_dag_priority\030\003 \001" + - "(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030first_atte" + - "mpt_start_time\030\005 \001(\003\022\"\n\032current_attempt_" + - "start_time\030\006 \001(\003\"\266\002\n\026SubmitWorkRequestPr" + - "oto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007am_h" + - "ost\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_iden" + - "tifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005 \001(\014" + - "\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_string\030", - "\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n\rfra" + - "gment_spec\030\t \001(\0132\022.FragmentSpecProto\0223\n\025" + - "fragment_runtime_info\030\n \001(\0132\024.FragmentRu" + - "ntimeInfo\"\031\n\027SubmitWorkResponseProto\"f\n\036" + - "SourceStateUpdatedRequestProto\022\020\n\010dag_na" + - "me\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(" + - "\0162\021.SourceStateProto\"!\n\037SourceStateUpdat" + - "edResponseProto\"X\n\031QueryCompleteRequestP" + - "roto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t" + - "\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryComple", - "teResponseProto\"\245\001\n\035TerminateFragmentReq" + - "uestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030" + - "\002 \001(\t\022\032\n\022dag_attempt_number\030\003 \001(\005\022\023\n\013ver" + - "tex_name\030\004 \001(\t\022\027\n\017fragment_number\030\005 \001(\005\022" + - "\026\n\016attempt_number\030\006 \001(\005\" \n\036TerminateFrag" + - "mentResponseProto*2\n\020SourceStateProto\022\017\n" + - "\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022LlapDa" + - "emonProtocol\022?\n\nsubmitWork\022\027.SubmitWorkR" + - "equestProto\032\030.SubmitWorkResponseProto\022W\n" + - "\022sourceStateUpdated\022\037.SourceStateUpdated", - "RequestProto\032 .SourceStateUpdatedRespons" + - "eProto\022H\n\rqueryComplete\022\032.QueryCompleteR" + - "equestProto\032\033.QueryCompleteResponseProto" + - "\022T\n\021terminateFragment\022\036.TerminateFragmen" + - "tRequestProto\032\037.TerminateFragmentRespons" + - "eProtoBH\n&org.apache.hadoop.hive.llap.da" + - "emon.rpcB\030LlapDaemonProtocolProtos\210\001\001\240\001\001" + "\333\002\n\021FragmentSpecProto\022\"\n\032fragment_identi" + + "fier_string\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t\022\023\n\013v" + + "ertex_name\030\003 \001(\t\0224\n\024processor_descriptor" + + "\030\004 \001(\0132\026.EntityDescriptorProto\022!\n\013input_" + + "specs\030\005 \003(\0132\014.IOSpecProto\022\"\n\014output_spec" + + "s\030\006 \003(\0132\014.IOSpecProto\0221\n\023grouped_input_s" + + "pecs\030\007 \003(\0132\024.GroupInputSpecProto\022\032\n\022vert" + + "ex_parallelism\030\010 \001(\005\022\027\n\017fragment_number\030" + + "\t \001(\005\022\026\n\016attempt_number\030\n \001(\005\"\344\001\n\023Fragme", + "ntRuntimeInfo\022#\n\033num_self_and_upstream_t" + + "asks\030\001 \001(\005\022-\n%num_self_and_upstream_comp" + + "leted_tasks\030\002 \001(\005\022\033\n\023within_dag_priority" + + "\030\003 \001(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030first_" + + "attempt_start_time\030\005 \001(\003\022\"\n\032current_atte" + + "mpt_start_time\030\006 \001(\003\"\266\002\n\026SubmitWorkReque" + + "stProto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007" + + "am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_" + + "identifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005" + + " \001(\014\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_str", + "ing\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n" + + "\rfragment_spec\030\t \001(\0132\022.FragmentSpecProto" + + "\0223\n\025fragment_runtime_info\030\n \001(\0132\024.Fragme" + + "ntRuntimeInfo\"\031\n\027SubmitWorkResponseProto" + + "\"f\n\036SourceStateUpdatedRequestProto\022\020\n\010da" + + "g_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030" + + "\003 \001(\0162\021.SourceStateProto\"!\n\037SourceStateU" + + "pdatedResponseProto\"X\n\031QueryCompleteRequ" + + "estProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002" + + " \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryCo", + "mpleteResponseProto\"g\n\035TerminateFragment" + + "RequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_na" + + "me\030\002 \001(\t\022\"\n\032fragment_identifier_string\030\007" + + " \001(\t\" \n\036TerminateFragmentResponseProto*2" + + "\n\020SourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS" + + "_RUNNING\020\0022\316\002\n\022LlapDaemonProtocol\022?\n\nsub" + + "mitWork\022\027.SubmitWorkRequestProto\032\030.Submi" + + "tWorkResponseProto\022W\n\022sourceStateUpdated" + + "\022\037.SourceStateUpdatedRequestProto\032 .Sour" + + "ceStateUpdatedResponseProto\022H\n\rqueryComp", + "lete\022\032.QueryCompleteRequestProto\032\033.Query" + + "CompleteResponseProto\022T\n\021terminateFragme" + + "nt\022\036.TerminateFragmentRequestProto\032\037.Ter" + + "minateFragmentResponseProtoBH\n&org.apach" + + "e.hadoop.hive.llap.daemon.rpcB\030LlapDaemo" + + "nProtocolProtos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -12877,7 +12606,7 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { internal_static_FragmentSpecProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_FragmentSpecProto_descriptor, - new java.lang.String[] { "TaskAttemptIdString", "DagName", "VertexName", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", "FragmentNumber", "AttemptNumber", }); + new java.lang.String[] { "FragmentIdentifierString", "DagName", "VertexName", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", "FragmentNumber", "AttemptNumber", }); internal_static_FragmentRuntimeInfo_descriptor = getDescriptor().getMessageTypes().get(5); internal_static_FragmentRuntimeInfo_fieldAccessorTable = new @@ -12925,7 +12654,7 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { internal_static_TerminateFragmentRequestProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_TerminateFragmentRequestProto_descriptor, - new java.lang.String[] { "QueryId", "DagName", "DagAttemptNumber", "VertexName", "FragmentNumber", "AttemptNumber", }); + new java.lang.String[] { "QueryId", "DagName", "FragmentIdentifierString", }); internal_static_TerminateFragmentResponseProto_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_TerminateFragmentResponseProto_fieldAccessorTable = new diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index ea2d77a..1620ddf 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -101,6 +101,7 @@ public AMReporter(LlapNodeId nodeId, Configuration conf) { this.heartbeatInterval = conf.getLong(LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS, LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT); + LOG.info("AMReporter running with NodeId: {}", nodeId); } @Override @@ -164,7 +165,7 @@ public void unregisterTask(String amLocation, int port) { synchronized (knownAppMasters) { amNodeInfo = knownAppMasters.get(amNodeId); if (amNodeInfo == null) { - LOG.error(("Ignoring unexpected unregisterRequest for am at: " + amLocation + ":" + port)); + LOG.info(("Ignoring duplocate unregisterRequest for am at: " + amLocation + ":" + port)); } amNodeInfo.decrementAndGetTaskCount(); // Not removing this here. Will be removed when taken off the queue and discovered to have 0 diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index d594d6a..2f2ccb0 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -45,7 +45,6 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; -import org.apache.log4j.Logger; import org.apache.log4j.NDC; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; @@ -55,11 +54,16 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; // TODO Convert this to a CompositeService public class ContainerRunnerImpl extends AbstractService implements ContainerRunner, FragmentCompletionHandler { - private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class); + // TODO Setup a set of threads to process incoming requests. + // Make sure requests for a single dag/query are handled by the same thread + + private static final Logger LOG = LoggerFactory.getLogger(ContainerRunnerImpl.class); public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor "; private volatile AMReporter amReporter; @@ -143,12 +147,7 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { LOG.info("Queueing container for execution: " + stringifySubmitRequest(request)); // This is the start of container-annotated logging. // TODO Reduce the length of this string. Way too verbose at the moment. - String ndcContextString = - request.getContainerIdString() + "_" + - request.getFragmentSpec().getDagName() + "_" + - request.getFragmentSpec().getVertexName() + - "_" + request.getFragmentSpec().getFragmentNumber() + "_" + - request.getFragmentSpec().getAttemptNumber(); + String ndcContextString = request.getFragmentSpec().getFragmentIdentifierString(); NDC.push(ndcContextString); try { Map env = new HashMap<>(); @@ -158,7 +157,7 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { FragmentSpecProto fragmentSpec = request.getFragmentSpec(); TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString( - fragmentSpec.getTaskAttemptIdString()); + fragmentSpec.getFragmentIdentifierString()); int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); QueryFragmentInfo fragmentInfo = queryTracker @@ -222,7 +221,8 @@ public void queryComplete(QueryCompleteRequestProto request) { @Override public void terminateFragment(TerminateFragmentRequestProto request) { - // TODO Implement when this gets used. + LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString()); + executorService.killFragment(request.getFragmentIdentifierString()); } private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) { @@ -235,15 +235,15 @@ private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto public static String stringifySubmitRequest(SubmitWorkRequestProto request) { StringBuilder sb = new StringBuilder(); + FragmentSpecProto fragmentSpec = request.getFragmentSpec(); sb.append("am_details=").append(request.getAmHost()).append(":").append(request.getAmPort()); + sb.append(", taskInfo=").append(fragmentSpec.getFragmentIdentifierString()); sb.append(", user=").append(request.getUser()); sb.append(", appIdString=").append(request.getApplicationIdString()); sb.append(", appAttemptNum=").append(request.getAppAttemptNumber()); sb.append(", containerIdString=").append(request.getContainerIdString()); - FragmentSpecProto fragmentSpec = request.getFragmentSpec(); sb.append(", dagName=").append(fragmentSpec.getDagName()); sb.append(", vertexName=").append(fragmentSpec.getVertexName()); - sb.append(", taskInfo=").append(fragmentSpec.getTaskAttemptIdString()); sb.append(", processor=").append(fragmentSpec.getProcessorDescriptor().getClassName()); sb.append(", numInputs=").append(fragmentSpec.getInputSpecsCount()); sb.append(", numOutputs=").append(fragmentSpec.getOutputSpecsCount()); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java index c3102f9..eb06a2f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java @@ -30,4 +30,10 @@ * @throws RejectedExecutionException */ void schedule(T t) throws RejectedExecutionException; + + /** + * Attempt to kill the fragment with the specified fragmentId + * @param fragmentId + */ + void killFragment(String fragmentId); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index bfc4d89..453a71e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -17,11 +17,10 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; -import java.util.Collections; import java.util.Comparator; -import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; @@ -29,6 +28,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -74,6 +74,8 @@ private static final String TASK_EXECUTOR_THREAD_NAME_FORMAT = "Task-Executor-%d"; private static final String WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT = "Wait-Queue-Scheduler-%d"; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + // Thread pool for actual execution of work. private final ListeningExecutorService executorService; private final EvictingPriorityBlockingQueue waitQueue; @@ -87,7 +89,7 @@ private final AtomicInteger numSlotsAvailable; // Tracks known tasks. - private final Set knownTasks = Collections.newSetFromMap(new ConcurrentHashMap()); + private final ConcurrentMap knownTasks = new ConcurrentHashMap<>(); private final Object lock = new Object(); @@ -131,27 +133,32 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePr @Override public void run() { + try { - synchronized (lock) { - while (waitQueue.isEmpty()) { - lock.wait(); - } - } - // Since schedule() can be called from multiple threads, we peek the wait queue, - // try scheduling the task and then remove the task if scheduling is successful. - // This will make sure the task's place in the wait queue is held until it gets scheduled. - while (true) { + while (!isShutdown.get()) { synchronized (lock) { + // Since schedule() can be called from multiple threads, we peek the wait queue, + // try scheduling the task and then remove the task if scheduling is successful. + // This will make sure the task's place in the wait queue is held until it gets scheduled. task = waitQueue.peek(); if (task == null) { - break; + if (!isShutdown.get()) { + lock.wait(); + } + continue; } - // if the task cannot finish and if no slots are available then don't schedule it. + // if the task cannot finish and if no slots are available then don't schedule it. boolean shouldWait = false; if (task.getTaskRunnerCallable().canFinish()) { + if (isDebugEnabled) { + LOG.debug( + "Attempting to schedule task {}, canFinish={}. Current state: preemptionQueueSize={}, numSlotsAvailable={}", + task.getRequestId(), task.getTaskRunnerCallable().canFinish(), + preemptionQueue.size(), numSlotsAvailable.get()); + } if (numSlotsAvailable.get() == 0 && preemptionQueue.isEmpty()) { shouldWait = true; } @@ -161,7 +168,9 @@ public void run() { } } if (shouldWait) { - lock.wait(); + if (!isShutdown.get()) { + lock.wait(); + } // Another task at a higher priority may have come in during the wait. Lookup the // queue again to pick up the task at the highest priority. continue; @@ -179,15 +188,20 @@ public void run() { synchronized (lock) { while (waitQueue.isEmpty()) { - lock.wait(); + if (!isShutdown.get()) { + lock.wait(); + } } } } } catch (InterruptedException e) { - // Executor service will create new thread if the current thread gets interrupted. We don't - // need to do anything with the exception. - LOG.info(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " thread has been interrupted."); + if (isShutdown.get()) { + LOG.info(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " thread has been interrupted after shutdown."); + } else { + LOG.warn(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " interrupted without shutdown", e); + throw new RuntimeException(e); + } } } } @@ -207,24 +221,23 @@ public void onFailure(Throwable t) { @Override public void schedule(TaskRunnerCallable task) throws RejectedExecutionException { - TaskWrapper taskWrapper = new TaskWrapper(task); - knownTasks.add(taskWrapper); + TaskWrapper taskWrapper = new TaskWrapper(task, this); + knownTasks.put(taskWrapper.getRequestId(), taskWrapper); TaskWrapper evictedTask; try { // Don't need a lock. Not subscribed for notifications yet, and marked as inWaitQueue evictedTask = waitQueue.offer(taskWrapper); } catch (RejectedExecutionException e) { - knownTasks.remove(taskWrapper); + knownTasks.remove(taskWrapper.getRequestId()); throw e; } - if (evictedTask == null) { - if (isInfoEnabled) { - LOG.info(task.getRequestId() + " added to wait queue."); - } - if (isDebugEnabled) { - LOG.debug("Wait Queue: {}", waitQueue); - } - } else { + if (isInfoEnabled) { + LOG.info("{} added to wait queue. Current wait queue size={}", task.getRequestId(), waitQueue.size()); + } + if (isDebugEnabled) { + LOG.debug("Wait Queue: {}", waitQueue); + } + if (evictedTask != null) { evictedTask.maybeUnregisterForFinishedStateNotifications(); evictedTask.getTaskRunnerCallable().killTask(); if (isInfoEnabled) { @@ -237,13 +250,41 @@ public void schedule(TaskRunnerCallable task) throws RejectedExecutionException } } + @Override + public void killFragment(String fragmentId) { + synchronized (lock) { + TaskWrapper taskWrapper = knownTasks.remove(fragmentId); + // Can be null since the task may have completed meanwhile. + if (taskWrapper != null) { + if (taskWrapper.inWaitQueue) { + if (isDebugEnabled) { + LOG.debug("Removing {} from waitQueue", fragmentId); + } + taskWrapper.setIsInWaitQueue(false); + waitQueue.remove(taskWrapper); + } + if (taskWrapper.inPreemptionQueue) { + if (isDebugEnabled) { + LOG.debug("Removing {} from preemptionQueue", fragmentId); + } + taskWrapper.setIsInPreemptableQueue(false); + preemptionQueue.remove(taskWrapper); + } + taskWrapper.getTaskRunnerCallable().killTask(); + } else { + LOG.info("Ignoring killFragment request for {} since it isn't known", fragmentId); + } + lock.notify(); + } + } + private boolean trySchedule(final TaskWrapper taskWrapper) { boolean scheduled = false; try { synchronized (lock) { boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); - boolean stateChanged = taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); + boolean stateChanged = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); ListenableFuture future = executorService.submit(taskWrapper.getTaskRunnerCallable()); taskWrapper.setIsInWaitQueue(false); FutureCallback wrappedCallback = new InternalCompletionListener(taskWrapper); @@ -357,7 +398,7 @@ public InternalCompletionListener(TaskWrapper taskWrapper) { @Override public void onSuccess(TaskRunner2Result result) { - knownTasks.remove(taskWrapper); + knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); @@ -366,7 +407,7 @@ public void onSuccess(TaskRunner2Result result) { @Override public void onFailure(Throwable t) { - knownTasks.remove(taskWrapper); + knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t); @@ -387,6 +428,9 @@ private void updatePreemptionListAndNotify(EndReason reason) { } numSlotsAvailable.incrementAndGet(); + LOG.info("Task {} complete. WaitQueueSize={}, numSlotsAvailable={}, preemptionQueueSize={}", + taskWrapper.getRequestId(), waitQueue.size(), numSlotsAvailable.get(), + preemptionQueue.size()); synchronized (lock) { if (!waitQueue.isEmpty()) { lock.notify(); @@ -398,21 +442,23 @@ private void updatePreemptionListAndNotify(EndReason reason) { // TODO: llap daemon should call this to gracefully shutdown the task executor service public void shutDown(boolean awaitTermination) { - if (awaitTermination) { - if (isDebugEnabled) { - LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + - " service gracefully"); - } - shutdownExecutor(waitQueueExecutorService); - shutdownExecutor(executorService); - shutdownExecutor(executionCompletionExecutorService); - } else { - if (isDebugEnabled) { - LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + - " service immediately"); + if (!isShutdown.getAndSet(true)) { + if (awaitTermination) { + if (isDebugEnabled) { + LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + + " service gracefully"); + } + shutdownExecutor(waitQueueExecutorService); + shutdownExecutor(executorService); + shutdownExecutor(executionCompletionExecutorService); + } else { + if (isDebugEnabled) { + LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + + " service immediately"); + } + executorService.shutdownNow(); + waitQueueExecutorService.shutdownNow(); } - executorService.shutdownNow(); - waitQueueExecutorService.shutdownNow(); } } @@ -474,14 +520,16 @@ public int compare(TaskWrapper t1, TaskWrapper t2) { } - private class TaskWrapper implements FinishableStateUpdateHandler { + public static class TaskWrapper implements FinishableStateUpdateHandler { private final TaskRunnerCallable taskRunnerCallable; private boolean inWaitQueue = true; private boolean inPreemptionQueue = false; private boolean registeredForNotifications = false; + private final TaskExecutorService taskExecutorService; - public TaskWrapper(TaskRunnerCallable taskRunnerCallable) { + public TaskWrapper(TaskRunnerCallable taskRunnerCallable, TaskExecutorService taskExecutorService) { this.taskRunnerCallable = taskRunnerCallable; + this.taskExecutorService = taskExecutorService; } // Methods are synchronized primarily for visibility. @@ -548,7 +596,7 @@ public void finishableStateUpdated(boolean finishableState) { // Meanwhile the scheduler could try updating states via a synchronized method. LOG.info("DEBUG: Received finishable state update for {}, state={}", taskRunnerCallable.getRequestId(), finishableState); - TaskExecutorService.this.finishableStateUpdated(this, finishableState); + taskExecutorService.finishableStateUpdated(this, finishableState); } } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 007c83d..1c12e12 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -98,6 +98,7 @@ private boolean shouldRunTask = true; final Stopwatch runtimeWatch = new Stopwatch(); final Stopwatch killtimerWatch = new Stopwatch(); + private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isCompleted = new AtomicBoolean(false); private final AtomicBoolean killInvoked = new AtomicBoolean(false); @@ -127,13 +128,15 @@ request.getUser(), jobToken); } this.metrics = metrics; - this.requestId = getTaskAttemptId(request); + this.requestId = getRequestId(request); this.killedTaskHandler = killedTaskHandler; this.fragmentCompletionHanler = fragmentCompleteHandler; } @Override protected TaskRunner2Result callInternal() throws Exception { + isStarted.set(true); + this.startTime = System.currentTimeMillis(); this.threadName = Thread.currentThread().getName(); if (LOG.isDebugEnabled()) { @@ -143,12 +146,19 @@ protected TaskRunner2Result callInternal() throws Exception { // Unregister from the AMReporter, since the task is now running. this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); + synchronized (this) { + if (!shouldRunTask) { + LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + } + } + // TODO This executor seems unnecessary. Here and TezChild ExecutorService executorReal = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat( - "TezTaskRunner_" + request.getFragmentSpec().getTaskAttemptIdString()) + "TezTaskRunner_" + request.getFragmentSpec().getFragmentIdentifierString()) .build()); executor = MoreExecutors.listeningDecorator(executorReal); @@ -244,6 +254,16 @@ public void killTask() { taskSpec.getTaskAttemptID()); } shouldRunTask = false; + } else { + // If the task hasn't started, and it is killed - report back to the AM that the task has been killed. + LOG.info("DBG: Reporting taskKilled for non-started fragment {}", getRequestId()); + reportTaskKilled(); + } + if (!isStarted.get()) { + // If the task hasn't started - inform about fragment completion immediately. It's possible for + // the callable to never run. + fragmentCompletionHanler.fragmentComplete(fragmentInfo); + this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); } } } else { @@ -360,7 +380,7 @@ public void onSuccess(TaskRunner2Result result) { metrics.incrExecutorTotalSuccess(); break; case CONTAINER_STOP_REQUESTED: - LOG.warn("Unexpected CONTAINER_STOP_REQUEST for {}", requestId); + LOG.info("Received container stop request (AM preemption) for {}", requestId); break; case KILL_REQUESTED: LOG.info("Killed task {}", requestId); @@ -439,8 +459,8 @@ public static String getTaskIdentifierString( return sb.toString(); } - private String getTaskAttemptId(SubmitWorkRequestProto request) { - return request.getFragmentSpec().getTaskAttemptIdString(); + private static String getRequestId(SubmitWorkRequestProto request) { + return request.getFragmentSpec().getFragmentIdentifierString(); } public long getFirstAttemptStartTime() { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java index 5bd1fe9..7428a6a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java @@ -43,7 +43,7 @@ public static TaskSpec getTaskSpecfromProto(FragmentSpecProto FragmentSpecProto) { TezTaskAttemptID taskAttemptID = - TezTaskAttemptID.fromString(FragmentSpecProto.getTaskAttemptIdString()); + TezTaskAttemptID.fromString(FragmentSpecProto.getFragmentIdentifierString()); ProcessorDescriptor processorDescriptor = null; if (FragmentSpecProto.hasProcessorDescriptor()) { @@ -83,7 +83,7 @@ public static TaskSpec getTaskSpecfromProto(FragmentSpecProto FragmentSpecProto) public static FragmentSpecProto convertTaskSpecToProto(TaskSpec taskSpec) { FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder(); - builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString()); + builder.setFragmentIdentifierString(taskSpec.getTaskAttemptID().toString()); builder.setDagName(taskSpec.getDAGName()); builder.setVertexName(taskSpec.getVertexName()); builder.setVertexParallelism(taskSpec.getVertexParallelism()); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 6abd706..6a38d85 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker; import org.apache.hadoop.io.DataOutputBuffer; @@ -52,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.ContainerEndReason; import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TezConfiguration; @@ -172,8 +175,15 @@ public void registerRunningContainer(ContainerId containerId, String hostname, i } @Override - public void registerContainerEnd(ContainerId containerId) { - super.registerContainerEnd(containerId); + public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) { + super.registerContainerEnd(containerId, endReason); + if (endReason == ContainerEndReason.INTERNAL_PREEMPTION) { + LOG.info("Processing containerEnd for container {} caused by internal preemption", containerId); + TezTaskAttemptID taskAttemptId = entityTracker.getTaskAttemptIdForContainer(containerId); + if (taskAttemptId != null) { + sendTaskTerminated(taskAttemptId, true); + } + } entityTracker.unregisterContainer(containerId); } @@ -224,7 +234,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them. getTaskCommunicatorContext() .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); - communicator.submitWork(requestProto, host, port, + communicator.sendSubmitWork(requestProto, host, port, new TaskCommunicator.ExecuteRequestCallback() { @Override public void setResponse(SubmitWorkResponseProto response) { @@ -238,7 +248,7 @@ public void indicateError(Throwable t) { t = se.getCause(); } if (t instanceof RemoteException) { - RemoteException re = (RemoteException)t; + RemoteException re = (RemoteException) t; String message = re.toString(); // RejectedExecutions from the remote service treated as KILLED if (message.contains(RejectedExecutionException.class.getName())) { @@ -249,7 +259,9 @@ public void indicateError(Throwable t) { TaskAttemptEndReason.SERVICE_BUSY, "Service Busy"); } else { // All others from the remote service cause the task to FAIL. - LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); + LOG.info( + "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId, t); getTaskCommunicatorContext() .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, t.toString()); @@ -264,7 +276,9 @@ public void indicateError(Throwable t) { TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error"); } else { // Anything else is a FAIL. - LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); + LOG.info( + "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId, t); getTaskCommunicatorContext() .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, t.getMessage()); @@ -275,14 +289,50 @@ public void indicateError(Throwable t) { } @Override - public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) { - super.unregisterRunningTaskAttempt(taskAttemptID); - entityTracker.unregisterTaskAttempt(taskAttemptID); + public void unregisterRunningTaskAttempt(final TezTaskAttemptID taskAttemptId, + TaskAttemptEndReason endReason) { + super.unregisterRunningTaskAttempt(taskAttemptId, endReason); + + if (endReason == TaskAttemptEndReason.INTERNAL_PREEMPTION) { + LOG.info("Processing taskEnd for task {} caused by internal preemption", taskAttemptId); + sendTaskTerminated(taskAttemptId, false); + } + entityTracker.unregisterTaskAttempt(taskAttemptId); // This will also be invoked for tasks which have been KILLED / rejected by the daemon. // Informing the daemon becomes necessary once the LlapScheduler supports preemption // and/or starts attempting to kill tasks which may be running on a node. } + private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId, + boolean invokedByContainerEnd) { + LOG.info( + "DBG: Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}", + taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : "taskEnd"); + LlapNodeId nodeId = entityTracker.getNodeIfForTaskAttempt(taskAttemptId); + // NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself + if (nodeId != null) { + TerminateFragmentRequestProto request = + TerminateFragmentRequestProto.newBuilder().setDagName(currentDagName) + .setFragmentIdentifierString(taskAttemptId.toString()).build(); + communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), + new TaskCommunicator.ExecuteRequestCallback() { + @Override + public void setResponse(TerminateFragmentResponseProto response) { + } + + @Override + public void indicateError(Throwable t) { + LOG.warn("Failed to send terminate fragment request for {}", + taskAttemptId.toString()); + } + }); + } else { + LOG.info( + "Not sending terminate request for fragment {} since it's node is not known. Already unregistered", + taskAttemptId.toString()); + } + } + @Override public void dagComplete(final String dagName) { QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder().setDagName( @@ -410,7 +460,7 @@ public void nodeHeartbeat(Text hostname, int port) { public void taskKilled(TezTaskAttemptID taskAttemptId) { // TODO Unregister the task for state updates, which could in turn unregister the node. getTaskCommunicatorContext().taskKilled(taskAttemptId, - TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM, "Attempt preempted"); + TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted"); entityTracker.unregisterTaskAttempt(taskAttemptId); } @@ -480,6 +530,42 @@ void registerContainer(ContainerId containerId, String hostname, int port) { containerToNodeMap.putIfAbsent(containerId, LlapNodeId.getInstance(hostname, port)); } + LlapNodeId getNodeIdForContainer(ContainerId containerId) { + return containerToNodeMap.get(containerId); + } + + LlapNodeId getNodeIfForTaskAttempt(TezTaskAttemptID taskAttemptId) { + return attemptToNodeMap.get(taskAttemptId); + } + + ContainerId getContainerIdForAttempt(TezTaskAttemptID taskAttemptId) { + LlapNodeId llapNodeId = getNodeIfForTaskAttempt(taskAttemptId); + if (llapNodeId != null) { + BiMap bMap = nodeMap.get(llapNodeId).inverse(); + if (bMap != null) { + return bMap.get(taskAttemptId); + } else { + return null; + } + } else { + return null; + } + } + + TezTaskAttemptID getTaskAttemptIdForContainer(ContainerId containerId) { + LlapNodeId llapNodeId = getNodeIdForContainer(containerId); + if (llapNodeId != null) { + BiMap bMap = nodeMap.get(llapNodeId); + if (bMap != null) { + return bMap.get(containerId); + } else { + return null; + } + } else { + return null; + } + } + void unregisterContainer(ContainerId containerId) { LlapNodeId llapNodeId = containerToNodeMap.remove(containerId); if (llapNodeId == null) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java index cec17f9..d357d61 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.net.NetUtils; @@ -84,21 +86,10 @@ public void serviceStop() { executor.shutdownNow(); } - public void submitWork(SubmitWorkRequestProto request, String host, int port, + public void sendSubmitWork(SubmitWorkRequestProto request, String host, int port, final ExecuteRequestCallback callback) { ListenableFuture future = executor.submit(new SubmitWorkCallable(host, port, request)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(SubmitWorkResponseProto result) { - callback.setResponse(result); - } - - @Override - public void onFailure(Throwable t) { - callback.indicateError(t); - } - }); - + Futures.addCallback(future, new ResponseCallback(callback)); } public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String host, @@ -106,17 +97,7 @@ public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final ExecuteRequestCallback callback) { ListenableFuture future = executor.submit(new SendSourceStateUpdateCallable(host, port, request)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(SourceStateUpdatedResponseProto result) { - callback.setResponse(result); - } - - @Override - public void onFailure(Throwable t) { - callback.indicateError(t); - } - }); + Futures.addCallback(future, new ResponseCallback(callback)); } public void sendQueryComplete(final QueryCompleteRequestProto request, final String host, @@ -124,17 +105,34 @@ public void sendQueryComplete(final QueryCompleteRequestProto request, final Str final ExecuteRequestCallback callback) { ListenableFuture future = executor.submit(new SendQueryCompleteCallable(host, port, request)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(QueryCompleteResponseProto result) { - callback.setResponse(result); - } + Futures.addCallback(future, new ResponseCallback(callback)); + } - @Override - public void onFailure(Throwable t) { - callback.indicateError(t); - } - }); + public void sendTerminateFragment(final TerminateFragmentRequestProto request, final String host, + final int port, + final ExecuteRequestCallback callback) { + ListenableFuture future = + executor.submit(new SendTerminateFragmentCallable(host, port, request)); + Futures.addCallback(future, new ResponseCallback(callback)); + } + + private static class ResponseCallback implements FutureCallback { + + private final ExecuteRequestCallback callback; + + public ResponseCallback(ExecuteRequestCallback callback) { + this.callback = callback; + } + + @Override + public void onSuccess(TYPE result) { + callback.setResponse(result); + } + + @Override + public void onFailure(Throwable t) { + callback.indicateError(t); + } } private static abstract class CallableRequest @@ -195,6 +193,20 @@ public QueryCompleteResponseProto call() throws Exception { } } + private class SendTerminateFragmentCallable + extends CallableRequest { + + protected SendTerminateFragmentCallable(String hostname, int port, + TerminateFragmentRequestProto terminateFragmentRequestProto) { + super(hostname, port, terminateFragmentRequestProto); + } + + @Override + public TerminateFragmentResponseProto call() throws Exception { + return getProxy(hostname, port).terminateFragment(null, request); + } + } + public interface ExecuteRequestCallback { void setResponse(T response); void indicateError(Throwable t); diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index d355029..5a2b77d 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -26,9 +26,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -39,6 +41,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -96,6 +99,8 @@ public int compare(Priority o1, Priority o2) { // Tracks running and queued tasks. Cleared after a task completes. private final ConcurrentMap knownTasks = new ConcurrentHashMap<>(); + private final TreeMap> runningTasks = new TreeMap<>(); + private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator(); // Queue for disabled nodes. Nodes make it out of this queue when their expiration timeout is hit. @VisibleForTesting @@ -119,6 +124,7 @@ public int compare(Priority o1, Priority o2) { private final SchedulerCallable schedulerCallable = new SchedulerCallable(); private final AtomicBoolean isStopped = new AtomicBoolean(false); + private final AtomicInteger pendingPreemptions = new AtomicInteger(0); private final NodeBlacklistConf nodeBlacklistConf; @@ -134,7 +140,6 @@ public int compare(Priority o1, Priority o2) { private final LlapRegistryService registry = new LlapRegistryService(false); - private volatile ListenableFuture nodeEnablerFuture; private volatile ListenableFuture schedulerFuture; @@ -385,6 +390,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container trySchedulingPendingTasks(); } + // This may be invoked before a container is ever assigned to a task. allocateTask... app decides // the task is no longer required, and asks for a de-allocation. @Override @@ -392,7 +398,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd writeLock.lock(); // Updating several local structures TaskInfo taskInfo; try { - taskInfo = knownTasks.remove(task); + taskInfo = unregisterTask(task); if (taskInfo == null) { LOG.error("Could not determine ContainerId for task: " + task @@ -418,12 +424,12 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance); assert nodeInfo != null; - if (taskSucceeded) { - // The node may have been blacklisted at this point - which means it may not be in the - // activeNodeList. - - nodeInfo.registerTaskSuccess(); + // Re-enable the node if preempted + if (taskInfo.preempted) { + LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason); + pendingPreemptions.decrementAndGet(); + nodeInfo.registerUnsuccessfulTaskEnd(true); if (nodeInfo.isDisabled()) { // Re-enable the node. If a task succeeded, a slot may have become available. // Also reset commFailures since a task was able to communicate back and indicate success. @@ -435,20 +441,40 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd } // In case of success, trigger a scheduling run for pending tasks. trySchedulingPendingTasks(); - - } else if (!taskSucceeded) { - nodeInfo.registerUnsuccessfulTaskEnd(); - if (endReason != null && EnumSet - .of(TaskAttemptEndReason.SERVICE_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) - .contains(endReason)) { - if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) { - dagStats.registerCommFailure(taskInfo.assignedInstance.getHost()); - } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) { - dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost()); + } else { + if (taskSucceeded) { + // The node may have been blacklisted at this point - which means it may not be in the + // activeNodeList. + + nodeInfo.registerTaskSuccess(); + + if (nodeInfo.isDisabled()) { + // Re-enable the node. If a task succeeded, a slot may have become available. + // Also reset commFailures since a task was able to communicate back and indicate success. + nodeInfo.enableNode(); + // Re-insert into the queue to force the poll thread to remove the element. + if (disabledNodesQueue.remove(nodeInfo)) { + disabledNodesQueue.add(nodeInfo); + } } + // In case of success, trigger a scheduling run for pending tasks. + trySchedulingPendingTasks(); + + } else if (!taskSucceeded) { + nodeInfo.registerUnsuccessfulTaskEnd(false); + if (endReason != null && EnumSet + .of(TaskAttemptEndReason.SERVICE_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) + .contains(endReason)) { + if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) { + dagStats.registerCommFailure(taskInfo.assignedInstance.getHost()); + } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) { + dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost()); + } + } + boolean commFailure = + endReason != null && endReason == TaskAttemptEndReason.COMMUNICATION_ERROR; + disableInstance(assignedInstance, commFailure); } - boolean commFailure = endReason != null && endReason == TaskAttemptEndReason.COMMUNICATION_ERROR; - disableInstance(assignedInstance, commFailure); } } finally { writeLock.unlock(); @@ -461,7 +487,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd public Object deallocateContainer(ContainerId containerId) { LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + containerId); // Containers are not being tracked for re-use. - // This is safe to ignore since a deallocate task should have come in earlier. + // This is safe to ignore since a deallocate task will come in. return null; } @@ -635,6 +661,7 @@ private void addPendingTask(TaskInfo taskInfo) { } } + /* Remove a task from the pending list */ private void removePendingTask(TaskInfo taskInfo) { writeLock.lock(); try { @@ -649,6 +676,48 @@ private void removePendingTask(TaskInfo taskInfo) { } } + /* Register a running task into the runningTasks structure */ + private void registerRunningTask(TaskInfo taskInfo) { + writeLock.lock(); + try { + int priority = taskInfo.priority.getPriority(); + TreeSet tasksAtpriority = runningTasks.get(priority); + if (tasksAtpriority == null) { + tasksAtpriority = new TreeSet<>(TASK_INFO_COMPARATOR); + runningTasks.put(priority, tasksAtpriority); + } + tasksAtpriority.add(taskInfo); + } finally { + writeLock.unlock(); + } + } + + /* Unregister a task from the known and running structures */ + private TaskInfo unregisterTask(Object task) { + writeLock.lock(); + try { + TaskInfo taskInfo = knownTasks.remove(task); + if (taskInfo != null) { + if (taskInfo.assigned) { + // Remove from the running list. + int priority = taskInfo.priority.getPriority(); + Set tasksAtPriority = runningTasks.get(priority); + Preconditions.checkState(tasksAtPriority != null, + "runningTasks should contain an entry if the task was in running state. Caused by task: {}", task); + tasksAtPriority.remove(taskInfo); + if (tasksAtPriority.isEmpty()) { + runningTasks.remove(priority); + } + } + } else { + LOG.warn("Could not find TaskInfo for task: {}. Not removing it from the running set", task); + } + return taskInfo; + } finally { + writeLock.unlock(); + } + } + @VisibleForTesting protected void schedulePendingTasks() { writeLock.lock(); @@ -674,6 +743,13 @@ protected void schedulePendingTasks() { if (scheduled) { taskIter.remove(); } else { + // Try pre-empting a task so that a higher priority task can take it's place. + // Preempt only if there's not pending preemptions to avoid preempting twice for a task. + LOG.info("Attempting to preempt for {}, pendingPreemptions={}", taskInfo.task, pendingPreemptions.get()); + if (pendingPreemptions.get() == 0) { + preemptTasks(entry.getKey().getPriority(), 1); + } + scheduledAllAtPriority = false; // Don't try assigning tasks at the next priority. break; @@ -705,10 +781,11 @@ private boolean scheduleTask(TaskInfo taskInfo) { nsPair.getServiceInstance().getRpcPort()); writeLock.lock(); // While updating local structures try { + LOG.info("Assigned task {} to container {}", taskInfo, container.getId()); dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, nsPair.getServiceInstance().getHost()); - taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), container.getId()); - knownTasks.putIfAbsent(taskInfo.task, taskInfo); + taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), container.getId(), clock.getTime()); + registerRunningTask(taskInfo); nsPair.getNodeInfo().registerTaskScheduled(); } finally { writeLock.unlock(); @@ -719,6 +796,59 @@ private boolean scheduleTask(TaskInfo taskInfo) { } } + // Removes tasks from the runningList and sends out a preempt request to the system. + // Subsequent tasks will be scheduled again once the de-allocate request for the preempted + // task is processed. + private void preemptTasks(int forPriority, int numTasksToPreempt) { + writeLock.lock(); + List preemptedTaskList = null; + try { + NavigableMap> orderedMap = runningTasks.descendingMap(); + Iterator>> iterator = orderedMap.entrySet().iterator(); + int preemptedCount = 0; + while (iterator.hasNext() && preemptedCount < numTasksToPreempt) { + Entry> entryAtPriority = iterator.next(); + if (entryAtPriority.getKey() > forPriority) { + Iterator taskInfoIterator = entryAtPriority.getValue().iterator(); + while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) { + TaskInfo taskInfo = taskInfoIterator.next(); + preemptedCount++; + LOG.info("preempting {} for task at priority {}", taskInfo, forPriority); + taskInfo.setPreemptedInfo(clock.getTime()); + if (preemptedTaskList == null) { + preemptedTaskList = new LinkedList<>(); + } + dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost()); + preemptedTaskList.add(taskInfo); + pendingPreemptions.incrementAndGet(); + // Remove from the runningTaskList + taskInfoIterator.remove(); + } + + // Remove entire priority level if it's been emptied. + if (entryAtPriority.getValue().isEmpty()) { + iterator.remove(); + } + } else { + // No tasks qualify as preemptable + LOG.info("DBG: No tasks qualify as killable to schedule tasks at priority {}", forPriority); + break; + } + } + } finally { + writeLock.unlock(); + } + // Send out the preempted request outside of the lock. + if (preemptedTaskList != null) { + for (TaskInfo taskInfo : preemptedTaskList) { + LOG.info("DBG: Preempting task {}", taskInfo); + appClientDelegate.preemptContainer(taskInfo.containerId); + } + } + // The schedule loop will be triggered again when the deallocateTask request comes in for the + // preempted task. + } + private class NodeEnablerCallable implements Callable { private AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -832,6 +962,7 @@ public void shutdown() { // Indicates whether a node is disabled - for whatever reason - commFailure, busy, etc. private boolean disabled = false; + private int numPreemptedTasks = 0; private int numScheduledTasks = 0; private final int numSchedulableTasks; @@ -917,8 +1048,11 @@ void registerTaskSuccess() { numScheduledTasks--; } - void registerUnsuccessfulTaskEnd() { + void registerUnsuccessfulTaskEnd(boolean wasPreempted) { numScheduledTasks--; + if (wasPreempted) { + numPreemptedTasks++; + } } public boolean isDisabled() { @@ -980,12 +1114,14 @@ public String toString() { int numRejectedTasks = 0; int numCommFailures = 0; int numDelayedAllocations = 0; + int numPreemptedTasks = 0; Map localityBasedNumAllocationsPerHost = new HashMap<>(); Map numAllocationsPerHost = new HashMap<>(); @Override public String toString() { StringBuilder sb = new StringBuilder(); + sb.append("NumPreemptedTasks=").append(numPreemptedTasks).append(", "); sb.append("NumRequestedAllocations=").append(numRequestedAllocations).append(", "); sb.append("NumRequestsWithlocation=").append(numRequestsWithLocation).append(", "); sb.append("NumLocalAllocations=").append(numLocalAllocations).append(","); @@ -1027,6 +1163,10 @@ void registerTaskAllocated(String[] requestedHosts, String[] requestedRacks, _registerAllocationInHostMap(allocatedHost, numAllocationsPerHost); } + void registerTaskPreempted(String host) { + numPreemptedTasks++; + } + void registerCommFailure(String host) { numCommFailures++; } @@ -1050,6 +1190,10 @@ private void _registerAllocationInHostMap(String host, Map>> 32)); + result = 31 * result + task.hashCode(); + return result; + } + + @Override + public String toString() { + return "TaskInfo{" + + "task=" + task + + ", priority=" + priority + + ", startTime=" + startTime + + ", containerId=" + containerId + + ", assignedInstance=" + assignedInstance + + ", uniqueId=" + uniqueId + + '}'; + } + } + + // Newer tasks first. + private static class TaskStartComparator implements Comparator { + + @Override + public int compare(TaskInfo o1, TaskInfo o2) { + if (o1.startTime > o2.startTime) { + return -1; + } else if (o1.startTime < o2.startTime) { + return 1; + } else { + // Comparing on time is not sufficient since two may be created at the same time, + // in which case inserting into a TreeSet/Map would break + if (o1.uniqueId > o2.uniqueId) { + return -1; + } else if (o1.uniqueId < o2.uniqueId) { + return 1; + } else { + return 0; + } + } + } } private static class NodeServiceInstancePair { diff --git llap-server/src/protobuf/LlapDaemonProtocol.proto llap-server/src/protobuf/LlapDaemonProtocol.proto index d8fd882..0ba6acf 100644 --- llap-server/src/protobuf/LlapDaemonProtocol.proto +++ llap-server/src/protobuf/LlapDaemonProtocol.proto @@ -48,7 +48,7 @@ message GroupInputSpecProto { message FragmentSpecProto { - optional string task_attempt_id_string = 1; + optional string fragment_identifier_string = 1; optional string dag_name = 2; optional string vertex_name = 3; optional EntityDescriptorProto processor_descriptor = 4; @@ -111,10 +111,7 @@ message QueryCompleteResponseProto { message TerminateFragmentRequestProto { optional string query_id = 1; optional string dag_name = 2; - optional int32 dag_attempt_number = 3; - optional string vertex_name = 4; - optional int32 fragment_number = 5; - optional int32 attempt_number = 6; + optional string fragment_identifier_string = 7; } message TerminateFragmentResponseProto { diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index a2e9501..6b6fac0 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; +import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; @@ -101,7 +102,7 @@ private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism .setVertexParallelism(parallelism) .setProcessorDescriptor( EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) - .setTaskAttemptIdString(taId.toString()).build()).setAmHost("localhost") + .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1") .setContainerIdString("MockContainer_1").setUser("MockUser") .setTokenIdentifier("MockToken_1") @@ -115,12 +116,12 @@ private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism @Test public void testWaitQueueComparator() throws InterruptedException { - MockRequest r1 = new MockRequest(createRequest(1, 2, 100), false, 100000); - MockRequest r2 = new MockRequest(createRequest(2, 4, 200), false, 100000); - MockRequest r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000); - MockRequest r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000); - MockRequest r5 = new MockRequest(createRequest(5, 10, 500), false, 1000000); - EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue( + TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 100), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 200), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 300), false, 1000000); + TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000); + TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 500), false, 1000000); + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); @@ -137,11 +138,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); - r1 = new MockRequest(createRequest(1, 2, 100), true, 100000); - r2 = new MockRequest(createRequest(2, 4, 200), true, 100000); - r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000); - r4 = new MockRequest(createRequest(4, 8, 400), true, 1000000); - r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createRequest(1, 2, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 200), true, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 400), true, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -159,11 +160,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); - r1 = new MockRequest(createRequest(1, 1, 100), true, 100000); - r2 = new MockRequest(createRequest(2, 1, 200), false, 100000); - r3 = new MockRequest(createRequest(3, 1, 300), true, 1000000); - r4 = new MockRequest(createRequest(4, 1, 400), false, 1000000); - r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createRequest(1, 1, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 1, 200), false, 100000); + r3 = createTaskWrapper(createRequest(3, 1, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 1, 400), false, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -181,11 +182,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); - r1 = new MockRequest(createRequest(1, 2, 100), true, 100000); - r2 = new MockRequest(createRequest(2, 4, 200), false, 100000); - r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000); - r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000); - r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createRequest(1, 2, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 200), false, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -203,11 +204,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); - r1 = new MockRequest(createRequest(1, 2, 100), true, 100000); - r2 = new MockRequest(createRequest(2, 4, 200), false, 100000); - r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000); - r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000); - r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createRequest(1, 2, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 200), false, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 300), false, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -225,11 +226,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); - r1 = new MockRequest(createRequest(1, 2, 100), false, 100000); - r2 = new MockRequest(createRequest(2, 4, 200), true, 100000); - r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000); - r4 = new MockRequest(createRequest(4, 8, 400), true, 1000000); - r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createRequest(1, 2, 100), false, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 200), true, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 400), true, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -250,12 +251,13 @@ public void testWaitQueueComparator() throws InterruptedException { @Test public void testPreemptionQueueComparator() throws InterruptedException { - MockRequest r1 = new MockRequest(createRequest(1, 2, 100), false, 100000); - MockRequest r2 = new MockRequest(createRequest(2, 4, 200), false, 100000); - MockRequest r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000); - MockRequest r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000); - BlockingQueue queue = new PriorityBlockingQueue(4, + TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 100), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 200), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 300), false, 1000000); + TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000); + BlockingQueue queue = new PriorityBlockingQueue<>(4, new TaskExecutorService.PreemptionQueueComparator()); + queue.offer(r1); assertEquals(r1, queue.peek()); queue.offer(r2); @@ -268,4 +270,10 @@ public void testPreemptionQueueComparator() throws InterruptedException { assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); } + + private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) { + MockRequest mockRequest = new MockRequest(request, canFinish, workTime); + TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null); + return taskWrapper; + } } diff --git llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java index a0399da..b9b89e3 100644 --- llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.SystemClock; @@ -49,6 +50,8 @@ public class TestLlapTaskSchedulerService { + // TODO Fix the races and the broken scheduler control in the tests + private static final String HOST1 = "host1"; private static final String HOST2 = "host2"; private static final String HOST3 = "host3"; @@ -94,6 +97,63 @@ public void testSimpleNoLocalityAllocation() { } } + // TODO Add a test to ensure the correct task is being preempted, and the completion for the specific + // task triggers the next task to be scheduled. + + @Test(timeout=5000) + public void testPreemption() throws InterruptedException { + + Priority priority1 = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + String [] hosts = new String[] {HOST1}; + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1); + try { + + Object task1 = new String("task1"); + Object clientCookie1 = new String("cookie1"); + Object task2 = new String("task2"); + Object clientCookie2 = new String("cookie1"); + Object task3 = new String("task3"); + Object clientCookie3 = new String("cookie1"); + Object task4 = new String("task4"); + Object clientCookie4 = new String("cookie1"); + + tsWrapper.controlScheduler(true); + int schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); + tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1); + tsWrapper.allocateTask(task2, hosts, priority2, clientCookie2); + tsWrapper.allocateTask(task3, hosts, priority2, clientCookie3); + tsWrapper.signalScheduler(); + tsWrapper.controlScheduler(false); + tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1); + verify(tsWrapper.mockAppCallback, times(2)).taskAllocated(any(Object.class), + any(Object.class), any(Container.class)); + assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + + reset(tsWrapper.mockAppCallback); + + tsWrapper.controlScheduler(true); + schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); + tsWrapper.allocateTask(task4, hosts, priority1, clientCookie4); + tsWrapper.controlScheduler(false); + tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1); + verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class)); + + schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); + tsWrapper.deallocateTask(task2, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); + tsWrapper.signalScheduler(); + Thread.sleep(2000l); + + verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4), + eq(clientCookie4), any(Container.class)); + + } finally { + tsWrapper.shutdown(); + } + + } + @Test(timeout=5000) public void testNodeDisabled() { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l); @@ -225,9 +285,15 @@ public void testNodeReEnabled() throws InterruptedException { } TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) { + this(2000l, new String[]{HOST1, HOST2, HOST3}, 4, + LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT); + } + + TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) { conf = new Configuration(); - conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, HOST1, HOST2, HOST3); - conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, 4); + conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, hosts); + conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, numExecutors); + conf.setInt(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE, waitQueueSize); conf.setLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS, disableTimeoutMillis); conf.setBoolean(LlapTaskSchedulerServiceForTest.LLAP_TASK_SCHEDULER_IN_TEST, true); @@ -270,6 +336,10 @@ void allocateTask(Object task, String[] hosts, Priority priority, Object clientC ts.allocateTask(task, resource, hosts, null, priority, null, clientCookie); } + void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReason) { + ts.deallocateTask(task, succeeded, endReason); + } + void rejectExecution(Object task) { ts.deallocateTask(task, false, TaskAttemptEndReason.SERVICE_BUSY); }