diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java index 57e0bce..ce4ec71 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java @@ -194,7 +194,7 @@ public void preempt(Context ctxt, PreemptionMessage preemptionRequests) { Collections.sort(listOfCont, new Comparator() { @Override public int compare(final Container o1, final Container o2) { - return o2.getId().getId() - o1.getId().getId(); + return o2.getId().compareTo(o1.getId()); } }); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java index 73e8085..6c4a180 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java @@ -38,9 +38,17 @@ @Unstable public static ContainerId newInstance(ApplicationAttemptId appAttemptId, int containerId) { + return newInstance(appAttemptId, containerId, 0L); + } + + @Private + @Unstable + public static ContainerId newInstance(ApplicationAttemptId appAttemptId, + int containerId, long epoch) { ContainerId id = Records.newRecord(ContainerId.class); id.setId(containerId); id.setApplicationAttemptId(appAttemptId); + id.setEpoch(epoch); id.build(); return id; } @@ -80,7 +88,14 @@ public static ContainerId newInstance(ApplicationAttemptId appAttemptId, @Private @Unstable protected abstract void setId(int id); - + + @Private + @Unstable + public abstract long getEpoch(); + + @Private + @Unstable + protected abstract void setEpoch(long epoch); // TODO: fail the app submission if attempts are more than 10 or something private static final ThreadLocal appAttemptIdFormat = @@ -112,6 +127,8 @@ public int hashCode() { final int prime = 435569; int result = 7507; result = prime * result + getId(); + result = prime * result + (int) (getEpoch() >> 32) & 0xffffffff; + result = prime * result + (int) getEpoch() & 0xffffffff; result = prime * result + getApplicationAttemptId().hashCode(); return result; } @@ -127,6 +144,8 @@ public boolean equals(Object obj) { ContainerId other = (ContainerId) obj; if (!this.getApplicationAttemptId().equals(other.getApplicationAttemptId())) return false; + if (this.getEpoch() != other.getEpoch()) + return false; if (this.getId() != other.getId()) return false; return true; @@ -136,7 +155,11 @@ public boolean equals(Object obj) { public int compareTo(ContainerId other) { if (this.getApplicationAttemptId().compareTo( other.getApplicationAttemptId()) == 0) { - return this.getId() - other.getId(); + if (this.getEpoch() - other.getEpoch() == 0) { + return this.getId() - other.getId(); + } else{ + return Long.compare(this.getEpoch(), other.getEpoch()); + } } else { return this.getApplicationAttemptId().compareTo( other.getApplicationAttemptId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3f1fa6c..3dbaa76 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -51,6 +51,7 @@ message ContainerIdProto { optional ApplicationIdProto app_id = 1; optional ApplicationAttemptIdProto app_attempt_id = 2; optional int32 id = 3; + optional int64 epoch = 4; } message ResourceProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java index 9be829f..0ef9a02 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java @@ -59,6 +59,17 @@ protected void setId(int id) { builder.setId((id)); } + @Override + public long getEpoch() { + Preconditions.checkNotNull(proto); + return proto.getEpoch(); + } + + @Override + protected void setEpoch(long epoch) { + Preconditions.checkNotNull(builder); + builder.setEpoch(epoch); + } @Override public ApplicationAttemptId getApplicationAttemptId() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index 8b8177a..8c777b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -129,6 +129,7 @@ public void write(DataOutput out) throws IOException { out.writeInt(applicationId.getId()); out.writeInt(applicationAttemptId.getAttemptId()); out.writeInt(this.containerId.getId()); + out.writeLong(this.containerId.getEpoch()); out.writeUTF(this.nmHostAddr); out.writeUTF(this.appSubmitter); out.writeInt(this.resource.getMemory()); @@ -147,7 +148,8 @@ public void readFields(DataInput in) throws IOException { ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, in.readInt()); this.containerId = - ContainerId.newInstance(applicationAttemptId, in.readInt()); + ContainerId.newInstance(applicationAttemptId, + in.readInt(), in.readLong()); this.nmHostAddr = in.readUTF(); this.appSubmitter = in.readUTF(); int memory = in.readInt(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestContainerId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestContainerId.java index f92df8a..57f2c10 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestContainerId.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestContainerId.java @@ -30,11 +30,11 @@ @Test public void testContainerId() { - ContainerId c1 = newContainerId(1, 1, 10l, 1); - ContainerId c2 = newContainerId(1, 1, 10l, 2); - ContainerId c3 = newContainerId(1, 1, 10l, 1); - ContainerId c4 = newContainerId(1, 3, 10l, 1); - ContainerId c5 = newContainerId(1, 3, 8l, 1); + ContainerId c1 = newContainerId(1, 1, 10l, 1, 0L); + ContainerId c2 = newContainerId(1, 1, 10l, 2, 0L); + ContainerId c3 = newContainerId(1, 1, 10l, 1, 0L); + ContainerId c4 = newContainerId(1, 3, 10l, 1, 0L); + ContainerId c5 = newContainerId(1, 3, 8l, 1, 0L); Assert.assertTrue(c1.equals(c3)); Assert.assertFalse(c1.equals(c2)); @@ -52,17 +52,34 @@ public void testContainerId() { Assert.assertFalse(c1.hashCode() == c5.hashCode()); long ts = System.currentTimeMillis(); - ContainerId c6 = newContainerId(36473, 4365472, ts, 25645811); + ContainerId c6 = newContainerId(36473, 4365472, ts, 25645811, 0L); Assert.assertEquals("container_10_0001_01_000001", c1.toString()); Assert.assertEquals("container_" + ts + "_36473_4365472_25645811", c6.toString()); + + ContainerId c7 = newContainerId(36473, 4365472, ts, 25645811, 0xfffffffffL); + ContainerId c8 = newContainerId(36473, 4365472, ts, 25645811, 0xfffffffffL); + + Assert.assertNotEquals(c6, c7); + Assert.assertEquals(c7, c8); + + Assert.assertFalse(c6.equals(c7)); + Assert.assertTrue(c7.equals(c8)); + + Assert.assertNotEquals(c6.hashCode(), c7.hashCode()); + Assert.assertEquals(c7.hashCode(), c8.hashCode()); + + Assert.assertTrue(c6.compareTo(c7) < 0); + Assert.assertTrue(c7.compareTo(c6) > 0); + Assert.assertEquals(0, c7.compareTo(c8)); + } public static ContainerId newContainerId(int appId, int appAttemptId, - long timestamp, int containerId) { + long timestamp, int containerId, long epoch) { ApplicationId applicationId = ApplicationId.newInstance(timestamp, appId); ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, appAttemptId); - return ContainerId.newInstance(applicationAttemptId, containerId); + return ContainerId.newInstance(applicationAttemptId, containerId, epoch); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 64eb428..a3a3120 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -138,16 +138,30 @@ public static ApplicationId convert(long clustertimestamp, CharSequence id) { } public static ContainerId newContainerId(ApplicationAttemptId appAttemptId, + int containerId, long epoch) { + return ContainerId.newInstance(appAttemptId, containerId, epoch); + } + + @Deprecated + @VisibleForTesting + public static ContainerId newContainerId(ApplicationAttemptId appAttemptId, int containerId) { return ContainerId.newInstance(appAttemptId, containerId); } - + + @Deprecated + @VisibleForTesting public static ContainerId newContainerId(int appId, int appAttemptId, long timestamp, int id) { + return newContainerId(appId, appAttemptId, timestamp, id, 0L); + } + + public static ContainerId newContainerId(int appId, int appAttemptId, + long timestamp, int id, long epoch) { ApplicationId applicationId = newApplicationId(timestamp, appId); ApplicationAttemptId applicationAttemptId = newApplicationAttemptId( applicationId, appAttemptId); - ContainerId cId = newContainerId(applicationAttemptId, id); + ContainerId cId = newContainerId(applicationAttemptId, id, epoch); return cId; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 3a51417..8768b33 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -83,6 +83,7 @@ private Resource amResource; private boolean unmanagedAM = true; private boolean amRunning = false; + private long epoch = 0; protected List newlyAllocatedContainers = new ArrayList(); @@ -114,6 +115,7 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, new AppSchedulingInfo(applicationAttemptId, user, queue, activeUsersManager, rmContext.getEpoch()); this.queue = queue; + this.epoch = rmContext.getEpoch(); if (rmContext.getRMApps() != null && rmContext.getRMApps() @@ -559,4 +561,8 @@ public synchronized void recoverContainer(RMContainer rmContainer) { // schedulingOpportunities // lastScheduledContainer } + + public long getEpoch() { + return epoch; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 65938aa..445c557 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1285,8 +1285,9 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, Resource capability, Priority priority) { NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = BuilderUtils.newContainerId(application - .getApplicationAttemptId(), application.getNewContainerId()); + ContainerId containerId = BuilderUtils.newContainerId( + application.getApplicationAttemptId(), application.getNewContainerId(), + application.getEpoch()); // Create the container Container container = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 0c36c55..4d173d0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -159,8 +159,9 @@ public Container createContainer( Resource capability, Priority priority) { NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = BuilderUtils.newContainerId(application - .getApplicationAttemptId(), application.getNewContainerId()); + ContainerId containerId = BuilderUtils.newContainerId( + application.getApplicationAttemptId(), + application.getNewContainerId(), application.getEpoch()); // Create the container Container container = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b017db7..4f75c49 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -663,8 +663,9 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application for (int i=0; i < assignedContainers; ++i) { NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = BuilderUtils.newContainerId(application - .getApplicationAttemptId(), application.getNewContainerId()); + ContainerId containerId = BuilderUtils.newContainerId( + application.getApplicationAttemptId(), + application.getNewContainerId(), application.getEpoch()); // Create the container Container container =