diff --git hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2ce54c4..fa28a16 100644 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -591,12 +591,6 @@ Release 2.8.0 - UNRELEASED HDFS-8489. Subclass BlockInfo to represent contiguous blocks. (Zhe Zhang via jing9) - HDFS-8386. Improve synchronization of 'streamer' reference in - DFSOutputStream. (Rakesh R via wang) - - HDFS-8513. Rename BlockPlacementPolicyRackFaultTolarent to - BlockPlacementPolicyRackFaultTolerant. (wang) - OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than @@ -930,9 +924,6 @@ Release 2.7.1 - UNRELEASED HDFS-8451. DFSClient probe for encryption testing interprets empty URI property for "enabled". (Steve Loughran via xyao) - HDFS-8486. DN startup may cause severe data loss (Daryn Sharp via Colin P. - McCabe) - Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 1dc4a9f..ae5d3eb 100755 --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -139,7 +139,7 @@ protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetIn @Override protected void checkClosed() throws IOException { if (isClosed()) { - getStreamer().getLastException().throwException4Close(); + streamer.getLastException().throwException4Close(); } } @@ -148,10 +148,10 @@ protected void checkClosed() throws IOException { // @VisibleForTesting public synchronized DatanodeInfo[] getPipeline() { - if (getStreamer().streamerClosed()) { + if (streamer.streamerClosed()) { return null; } - DatanodeInfo[] currentNodes = getStreamer().getNodes(); + DatanodeInfo[] currentNodes = streamer.getNodes(); if (currentNodes == null) { return null; } @@ -293,9 +293,9 @@ private DFSOutputStream(DFSClient dfsClient, String src, // indicate that we are appending to an existing block streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager); - getStreamer().setBytesCurBlock(lastBlock.getBlockSize()); + streamer.setBytesCurBlock(lastBlock.getBlockSize()); adjustPacketChunkSize(stat); - getStreamer().setPipelineInConstruction(lastBlock); + streamer.setPipelineInConstruction(lastBlock); } else { computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); @@ -329,7 +329,7 @@ private void adjustPacketChunkSize(HdfsFileStatus stat) throws IOException{ // computePacketChunkSize(0, freeInCksum); setChecksumBufSize(freeInCksum); - getStreamer().setAppendChunk(true); + streamer.setAppendChunk(true); } else { // if the remaining space in the block is smaller than // that expected size of of a packet, then create @@ -392,36 +392,36 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, } if (currentPacket == null) { - currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer() - .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false); + currentPacket = createPacket(packetSize, chunksPerPacket, + streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.getSeqno() + ", src=" + src + ", packetSize=" + packetSize + ", chunksPerPacket=" + chunksPerPacket + - ", bytesCurBlock=" + getStreamer().getBytesCurBlock()); + ", bytesCurBlock=" + streamer.getBytesCurBlock()); } } currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeData(b, offset, len); currentPacket.incNumChunks(); - getStreamer().incBytesCurBlock(len); + streamer.incBytesCurBlock(len); // If packet is full, enqueue it for transmission // if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || - getStreamer().getBytesCurBlock() == blockSize) { + streamer.getBytesCurBlock() == blockSize) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + currentPacket.getSeqno() + ", src=" + src + - ", bytesCurBlock=" + getStreamer().getBytesCurBlock() + + ", bytesCurBlock=" + streamer.getBytesCurBlock() + ", blockSize=" + blockSize + - ", appendChunk=" + getStreamer().getAppendChunk()); + ", appendChunk=" + streamer.getAppendChunk()); } - getStreamer().waitAndQueuePacket(currentPacket); + streamer.waitAndQueuePacket(currentPacket); currentPacket = null; adjustChunkBoundary(); @@ -436,14 +436,14 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, * crc chunks from now on. */ protected void adjustChunkBoundary() { - if (getStreamer().getAppendChunk() && - getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) { - getStreamer().setAppendChunk(false); + if (streamer.getAppendChunk() && + streamer.getBytesCurBlock() % bytesPerChecksum == 0) { + streamer.setAppendChunk(false); resetChecksumBufSize(); } - if (!getStreamer().getAppendChunk()) { - int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()), + if (!streamer.getAppendChunk()) { + int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()), dfsClient.getConf().getWritePacketSize()); computePacketChunkSize(psize, bytesPerChecksum); } @@ -456,13 +456,13 @@ protected void adjustChunkBoundary() { * @throws IOException */ protected void endBlock() throws IOException { - if (getStreamer().getBytesCurBlock() == blockSize) { - currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), - getStreamer().getAndIncCurrentSeqno(), true); + if (streamer.getBytesCurBlock() == blockSize) { + currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), + streamer.getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock); - getStreamer().waitAndQueuePacket(currentPacket); + streamer.waitAndQueuePacket(currentPacket); currentPacket = null; - getStreamer().setBytesCurBlock(0); + streamer.setBytesCurBlock(0); lastFlushOffset = 0; } } @@ -551,33 +551,30 @@ private void flushOrSync(boolean isSync, EnumSet syncFlags) if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient flush(): " - + " bytesCurBlock=" + getStreamer().getBytesCurBlock() + + " bytesCurBlock=" + streamer.getBytesCurBlock() + " lastFlushOffset=" + lastFlushOffset + " createNewBlock=" + endBlock); } // Flush only if we haven't already flushed till this offset. - if (lastFlushOffset != getStreamer().getBytesCurBlock()) { - assert getStreamer().getBytesCurBlock() > lastFlushOffset; + if (lastFlushOffset != streamer.getBytesCurBlock()) { + assert streamer.getBytesCurBlock() > lastFlushOffset; // record the valid offset of this flush - lastFlushOffset = getStreamer().getBytesCurBlock(); + lastFlushOffset = streamer.getBytesCurBlock(); if (isSync && currentPacket == null && !endBlock) { // Nothing to send right now, // but sync was requested. // Send an empty packet if we do not end the block right now currentPacket = createPacket(packetSize, chunksPerPacket, - getStreamer().getBytesCurBlock(), getStreamer() - .getAndIncCurrentSeqno(), false); + streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false); } } else { - if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) { + if (isSync && streamer.getBytesCurBlock() > 0 && !endBlock) { // Nothing to send right now, // and the block was partially written, // and sync was requested. - // So send an empty sync packet if we do not end the block right - // now + // So send an empty sync packet if we do not end the block right now currentPacket = createPacket(packetSize, chunksPerPacket, - getStreamer().getBytesCurBlock(), getStreamer() - .getAndIncCurrentSeqno(), false); + streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false); } else if (currentPacket != null) { // just discard the current packet since it is already been sent. currentPacket.releaseBuffer(byteArrayManager); @@ -586,44 +583,42 @@ private void flushOrSync(boolean isSync, EnumSet syncFlags) } if (currentPacket != null) { currentPacket.setSyncBlock(isSync); - getStreamer().waitAndQueuePacket(currentPacket); + streamer.waitAndQueuePacket(currentPacket); currentPacket = null; } - if (endBlock && getStreamer().getBytesCurBlock() > 0) { + if (endBlock && streamer.getBytesCurBlock() > 0) { // Need to end the current block, thus send an empty packet to // indicate this is the end of the block and reset bytesCurBlock - currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), - getStreamer().getAndIncCurrentSeqno(), true); + currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), + streamer.getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock || isSync); - getStreamer().waitAndQueuePacket(currentPacket); + streamer.waitAndQueuePacket(currentPacket); currentPacket = null; - getStreamer().setBytesCurBlock(0); + streamer.setBytesCurBlock(0); lastFlushOffset = 0; } else { // Restore state of stream. Record the last flush offset // of the last full chunk that was flushed. - getStreamer().setBytesCurBlock( - getStreamer().getBytesCurBlock() - numKept); + streamer.setBytesCurBlock(streamer.getBytesCurBlock() - numKept); } - toWaitFor = getStreamer().getLastQueuedSeqno(); + toWaitFor = streamer.getLastQueuedSeqno(); } // end synchronized - getStreamer().waitForAckedSeqno(toWaitFor); + streamer.waitForAckedSeqno(toWaitFor); // update the block length first time irrespective of flag - if (updateLength || getStreamer().getPersistBlocks().get()) { + if (updateLength || streamer.getPersistBlocks().get()) { synchronized (this) { - if (!getStreamer().streamerClosed() - && getStreamer().getBlock() != null) { - lastBlockLength = getStreamer().getBlock().getNumBytes(); + if (!streamer.streamerClosed() && streamer.getBlock() != null) { + lastBlockLength = streamer.getBlock().getNumBytes(); } } } // If 1) any new blocks were allocated since the last flush, or 2) to // update length in NN is required, then persist block locations on // namenode. - if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) { + if (streamer.getPersistBlocks().getAndSet(false) || updateLength) { try { dfsClient.namenode.fsync(src, fileId, dfsClient.clientName, lastBlockLength); @@ -640,8 +635,8 @@ private void flushOrSync(boolean isSync, EnumSet syncFlags) } synchronized(this) { - if (!getStreamer().streamerClosed()) { - getStreamer().setHflush(); + if (!streamer.streamerClosed()) { + streamer.setHflush(); } } } catch (InterruptedIOException interrupt) { @@ -653,7 +648,7 @@ private void flushOrSync(boolean isSync, EnumSet syncFlags) DFSClient.LOG.warn("Error while syncing", e); synchronized (this) { if (!isClosed()) { - getStreamer().getLastException().set(e); + streamer.getLastException().set(e); closeThreads(true); } } @@ -678,10 +673,10 @@ public synchronized int getNumCurrentReplicas() throws IOException { public synchronized int getCurrentBlockReplication() throws IOException { dfsClient.checkOpen(); checkClosed(); - if (getStreamer().streamerClosed()) { + if (streamer.streamerClosed()) { return blockReplication; // no pipeline, return repl factor of file } - DatanodeInfo[] currentNodes = getStreamer().getNodes(); + DatanodeInfo[] currentNodes = streamer.getNodes(); if (currentNodes == null) { return blockReplication; // no pipeline, return repl factor of file } @@ -700,16 +695,16 @@ protected void flushInternal() throws IOException { // // If there is data in the current buffer, send it across // - getStreamer().queuePacket(currentPacket); + streamer.queuePacket(currentPacket); currentPacket = null; - toWaitFor = getStreamer().getLastQueuedSeqno(); + toWaitFor = streamer.getLastQueuedSeqno(); } - getStreamer().waitForAckedSeqno(toWaitFor); + streamer.waitForAckedSeqno(toWaitFor); } protected synchronized void start() { - getStreamer().start(); + streamer.start(); } /** @@ -720,32 +715,32 @@ synchronized void abort() throws IOException { if (isClosed()) { return; } - getStreamer().getLastException().set(new IOException("Lease timeout of " + streamer.getLastException().set(new IOException("Lease timeout of " + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired.")); closeThreads(true); dfsClient.endFileLease(fileId); } boolean isClosed() { - return closed || getStreamer().streamerClosed(); + return closed || streamer.streamerClosed(); } void setClosed() { closed = true; - getStreamer().release(); + streamer.release(); } // shutdown datastreamer and responseprocessor threads. // interrupt datastreamer if force is true protected void closeThreads(boolean force) throws IOException { try { - getStreamer().close(force); - getStreamer().join(); - getStreamer().closeSocket(); + streamer.close(force); + streamer.join(); + streamer.closeSocket(); } catch (InterruptedException e) { throw new IOException("Failed to shutdown streamer"); } finally { - getStreamer().setSocketToNull(); + streamer.setSocketToNull(); setClosed(); } } @@ -767,7 +762,7 @@ public synchronized void close() throws IOException { protected synchronized void closeImpl() throws IOException { if (isClosed()) { - getStreamer().getLastException().check(true); + streamer.getLastException().check(true); return; } @@ -775,20 +770,20 @@ protected synchronized void closeImpl() throws IOException { flushBuffer(); // flush from all upper layers if (currentPacket != null) { - getStreamer().waitAndQueuePacket(currentPacket); + streamer.waitAndQueuePacket(currentPacket); currentPacket = null; } - if (getStreamer().getBytesCurBlock() != 0) { + if (streamer.getBytesCurBlock() != 0) { // send an empty packet to mark the end of the block - currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), - getStreamer().getAndIncCurrentSeqno(), true); + currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), + streamer.getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock); } flushInternal(); // flush all data to Datanodes // get last block before destroying the streamer - ExtendedBlock lastBlock = getStreamer().getBlock(); + ExtendedBlock lastBlock = streamer.getBlock(); closeThreads(false); TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); try { @@ -846,7 +841,7 @@ protected void completeFile(ExtendedBlock last) throws IOException { @VisibleForTesting public void setArtificialSlowdown(long period) { - getStreamer().setArtificialSlowdown(period); + streamer.setArtificialSlowdown(period); } @VisibleForTesting @@ -873,7 +868,7 @@ public FileEncryptionInfo getFileEncryptionInfo() { * Returns the access token currently used by streamer, for testing only */ synchronized Token getBlockToken() { - return getStreamer().getBlockToken(); + return streamer.getBlockToken(); } @Override @@ -890,25 +885,11 @@ public void setDropBehind(Boolean dropBehind) throws IOException { @VisibleForTesting ExtendedBlock getBlock() { - return getStreamer().getBlock(); + return streamer.getBlock(); } @VisibleForTesting public long getFileId() { return fileId; } - - /** - * Set the data streamer object. - */ - protected synchronized void setStreamer(DataStreamer streamer) { - this.streamer = streamer; - } - - /** - * Returns the data streamer object. - */ - protected synchronized DataStreamer getStreamer() { - return streamer; - } } diff --git hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java new file mode 100644 index 0000000..4dbf384 --- /dev/null +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; + +import java.util.*; + +/** + * The class is responsible for choosing the desired number of targets + * for placing block replicas. + * The strategy is that it tries its best to place the replicas to most racks. + */ +@InterfaceAudience.Private +public class BlockPlacementPolicyRackFaultTolarent extends BlockPlacementPolicyDefault { + + @Override + protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) { + int clusterSize = clusterMap.getNumOfLeaves(); + int totalNumOfReplicas = numOfChosen + numOfReplicas; + if (totalNumOfReplicas > clusterSize) { + numOfReplicas -= (totalNumOfReplicas-clusterSize); + totalNumOfReplicas = clusterSize; + } + // No calculation needed when there is only one rack or picking one node. + int numOfRacks = clusterMap.getNumOfRacks(); + if (numOfRacks == 1 || totalNumOfReplicas <= 1) { + return new int[] {numOfReplicas, totalNumOfReplicas}; + } + if(totalNumOfReplicas + * In the end, the difference of the numbers of replicas for each two racks + * is no more than 1. + * Either way it always prefer local storage. + * @return local node of writer + */ + @Override + protected Node chooseTargetInOrder(int numOfReplicas, + Node writer, + final Set excludedNodes, + final long blocksize, + final int maxNodesPerRack, + final List results, + final boolean avoidStaleNodes, + final boolean newBlock, + EnumMap storageTypes) + throws NotEnoughReplicasException { + int totalReplicaExpected = results.size() + numOfReplicas; + int numOfRacks = clusterMap.getNumOfRacks(); + if (totalReplicaExpected < numOfRacks || + totalReplicaExpected % numOfRacks == 0) { + writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + return writer; + } + + assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks; + + // Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1) + // replicas. + HashMap rackCounts = new HashMap<>(); + for (DatanodeStorageInfo dsInfo : results) { + String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation(); + Integer count = rackCounts.get(rack); + if (count != null) { + rackCounts.put(rack, count + 1); + } else { + rackCounts.put(rack, 1); + } + } + int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results + for (int count : rackCounts.values()) { + if (count > maxNodesPerRack -1) { + excess += count - (maxNodesPerRack -1); + } + } + numOfReplicas = Math.min(totalReplicaExpected - results.size(), + (maxNodesPerRack -1) * numOfRacks - (results.size() - excess)); + + // Fill each rack exactly (maxNodesPerRack-1) replicas. + writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes), + blocksize, maxNodesPerRack -1, results, avoidStaleNodes, storageTypes); + + for (DatanodeStorageInfo resultStorage : results) { + addToExcludedNodes(resultStorage.getDatanodeDescriptor(), excludedNodes); + } + + // For some racks, place one more replica to each one of them. + numOfReplicas = totalReplicaExpected - results.size(); + chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + + return writer; + } + + /** + * Randomly choose numOfReplicas targets from the given scope. + * Except that 1st replica prefer local storage. + * @return local node of writer. + */ + private Node chooseOnce(int numOfReplicas, + Node writer, + final Set excludedNodes, + final long blocksize, + final int maxNodesPerRack, + final List results, + final boolean avoidStaleNodes, + EnumMap storageTypes) + throws NotEnoughReplicasException { + if (numOfReplicas == 0) { + return writer; + } + writer = chooseLocalStorage(writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes, true) + .getDatanodeDescriptor(); + if (--numOfReplicas == 0) { + return writer; + } + chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + return writer; + } +} diff --git hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java deleted file mode 100644 index f25fb15..0000000 --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.net.Node; -import org.apache.hadoop.net.NodeBase; - -import java.util.*; - -/** - * The class is responsible for choosing the desired number of targets - * for placing block replicas. - * The strategy is that it tries its best to place the replicas to most racks. - */ -@InterfaceAudience.Private -public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyDefault { - - @Override - protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) { - int clusterSize = clusterMap.getNumOfLeaves(); - int totalNumOfReplicas = numOfChosen + numOfReplicas; - if (totalNumOfReplicas > clusterSize) { - numOfReplicas -= (totalNumOfReplicas-clusterSize); - totalNumOfReplicas = clusterSize; - } - // No calculation needed when there is only one rack or picking one node. - int numOfRacks = clusterMap.getNumOfRacks(); - if (numOfRacks == 1 || totalNumOfReplicas <= 1) { - return new int[] {numOfReplicas, totalNumOfReplicas}; - } - if(totalNumOfReplicas - * In the end, the difference of the numbers of replicas for each two racks - * is no more than 1. - * Either way it always prefer local storage. - * @return local node of writer - */ - @Override - protected Node chooseTargetInOrder(int numOfReplicas, - Node writer, - final Set excludedNodes, - final long blocksize, - final int maxNodesPerRack, - final List results, - final boolean avoidStaleNodes, - final boolean newBlock, - EnumMap storageTypes) - throws NotEnoughReplicasException { - int totalReplicaExpected = results.size() + numOfReplicas; - int numOfRacks = clusterMap.getNumOfRacks(); - if (totalReplicaExpected < numOfRacks || - totalReplicaExpected % numOfRacks == 0) { - writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes); - return writer; - } - - assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks; - - // Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1) - // replicas. - HashMap rackCounts = new HashMap<>(); - for (DatanodeStorageInfo dsInfo : results) { - String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation(); - Integer count = rackCounts.get(rack); - if (count != null) { - rackCounts.put(rack, count + 1); - } else { - rackCounts.put(rack, 1); - } - } - int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results - for (int count : rackCounts.values()) { - if (count > maxNodesPerRack -1) { - excess += count - (maxNodesPerRack -1); - } - } - numOfReplicas = Math.min(totalReplicaExpected - results.size(), - (maxNodesPerRack -1) * numOfRacks - (results.size() - excess)); - - // Fill each rack exactly (maxNodesPerRack-1) replicas. - writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes), - blocksize, maxNodesPerRack -1, results, avoidStaleNodes, storageTypes); - - for (DatanodeStorageInfo resultStorage : results) { - addToExcludedNodes(resultStorage.getDatanodeDescriptor(), excludedNodes); - } - - // For some racks, place one more replica to each one of them. - numOfReplicas = totalReplicaExpected - results.size(); - chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes); - - return writer; - } - - /** - * Randomly choose numOfReplicas targets from the given scope. - * Except that 1st replica prefer local storage. - * @return local node of writer. - */ - private Node chooseOnce(int numOfReplicas, - Node writer, - final Set excludedNodes, - final long blocksize, - final int maxNodesPerRack, - final List results, - final boolean avoidStaleNodes, - EnumMap storageTypes) - throws NotEnoughReplicasException { - if (numOfReplicas == 0) { - return writer; - } - writer = chooseLocalStorage(writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes, true) - .getDatanodeDescriptor(); - if (--numOfReplicas == 0) { - return writer; - } - chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes); - return writer; - } -} diff --git hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index f73eb66..d2b2939 100644 --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1370,9 +1370,9 @@ void initBlockPool(BPOfferService bpos) throws IOException { // failures. checkDiskError(); + initDirectoryScanner(conf); data.addBlockPool(nsInfo.getBlockPoolID(), conf); blockScanner.enableBlockPoolId(bpos.getBlockPoolId()); - initDirectoryScanner(conf); } List getAllBpOs() { diff --git hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 94aaf21..951c759 100644 --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -550,28 +550,10 @@ ReplicaInfo resolveDuplicateReplicas( // Leave both block replicas in place. return replica1; } - final ReplicaInfo replicaToDelete = - selectReplicaToDelete(replica1, replica2); - final ReplicaInfo replicaToKeep = - (replicaToDelete != replica1) ? replica1 : replica2; - // Update volumeMap and delete the replica - volumeMap.add(bpid, replicaToKeep); - if (replicaToDelete != null) { - deleteReplica(replicaToDelete); - } - return replicaToKeep; - } - static ReplicaInfo selectReplicaToDelete(final ReplicaInfo replica1, - final ReplicaInfo replica2) { ReplicaInfo replicaToKeep; ReplicaInfo replicaToDelete; - // it's the same block so don't ever delete it, even if GS or size - // differs. caller should keep the one it just discovered on disk - if (replica1.getBlockFile().equals(replica2.getBlockFile())) { - return null; - } if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) { replicaToKeep = replica1.getGenerationStamp() > replica2.getGenerationStamp() ? replica1 : replica2; @@ -591,10 +573,10 @@ static ReplicaInfo selectReplicaToDelete(final ReplicaInfo replica1, LOG.debug("resolveDuplicateReplicas decide to keep " + replicaToKeep + ". Will try to delete " + replicaToDelete); } - return replicaToDelete; - } - private void deleteReplica(final ReplicaInfo replicaToDelete) { + // Update volumeMap. + volumeMap.add(bpid, replicaToKeep); + // Delete the files on disk. Failure here is okay. final File blockFile = replicaToDelete.getBlockFile(); if (!blockFile.delete()) { @@ -604,8 +586,10 @@ private void deleteReplica(final ReplicaInfo replicaToDelete) { if (!metaFile.delete()) { LOG.warn("Failed to delete meta file " + metaFile); } - } + return replicaToKeep; + } + /** * Find out the number of bytes in the block that match its crc. * diff --git hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 59c7ade..9f4f700 100644 --- hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -51,7 +51,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; -import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -67,8 +66,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyListOf; @@ -414,35 +411,4 @@ public void testDeletingBlocks() throws IOException { cluster.shutdown(); } } - - @Test - public void testDuplicateReplicaResolution() throws IOException { - FsVolumeImpl fsv1 = Mockito.mock(FsVolumeImpl.class); - FsVolumeImpl fsv2 = Mockito.mock(FsVolumeImpl.class); - - File f1 = new File("d1/block"); - File f2 = new File("d2/block"); - - ReplicaInfo replicaOlder = new FinalizedReplica(1,1,1,fsv1,f1); - ReplicaInfo replica = new FinalizedReplica(1,2,2,fsv1,f1); - ReplicaInfo replicaSame = new FinalizedReplica(1,2,2,fsv1,f1); - ReplicaInfo replicaNewer = new FinalizedReplica(1,3,3,fsv1,f1); - - ReplicaInfo replicaOtherOlder = new FinalizedReplica(1,1,1,fsv2,f2); - ReplicaInfo replicaOtherSame = new FinalizedReplica(1,2,2,fsv2,f2); - ReplicaInfo replicaOtherNewer = new FinalizedReplica(1,3,3,fsv2,f2); - - // equivalent path so don't remove either - assertNull(BlockPoolSlice.selectReplicaToDelete(replicaSame, replica)); - assertNull(BlockPoolSlice.selectReplicaToDelete(replicaOlder, replica)); - assertNull(BlockPoolSlice.selectReplicaToDelete(replicaNewer, replica)); - - // keep latest found replica - assertSame(replica, - BlockPoolSlice.selectReplicaToDelete(replicaOtherSame, replica)); - assertSame(replicaOtherOlder, - BlockPoolSlice.selectReplicaToDelete(replicaOtherOlder, replica)); - assertSame(replica, - BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica)); - } } diff --git hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java new file mode 100644 index 0000000..d86a267 --- /dev/null +++ hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolarent.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolarent; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.net.StaticMapping; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestBlockPlacementPolicyRackFaultTolarent { + + private static final int DEFAULT_BLOCK_SIZE = 1024; + private MiniDFSCluster cluster = null; + private NamenodeProtocols nameNodeRpc = null; + private FSNamesystem namesystem = null; + private PermissionStatus perm = null; + + @Before + public void setup() throws IOException { + StaticMapping.resetMap(); + Configuration conf = new HdfsConfiguration(); + final ArrayList rackList = new ArrayList(); + final ArrayList hostList = new ArrayList(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 2; j++) { + rackList.add("/rack" + i); + hostList.add("/host" + i + j); + } + } + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolarent.class, + BlockPlacementPolicy.class); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(hostList.size()) + .racks(rackList.toArray(new String[rackList.size()])) + .hosts(hostList.toArray(new String[hostList.size()])) + .build(); + cluster.waitActive(); + nameNodeRpc = cluster.getNameNodeRpc(); + namesystem = cluster.getNamesystem(); + perm = new PermissionStatus("TestBlockPlacementPolicyEC", null, + FsPermission.getDefault()); + } + + @After + public void teardown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testChooseTarget() throws Exception { + doTestChooseTargetNormalCase(); + doTestChooseTargetSpecialCase(); + } + + private void doTestChooseTargetNormalCase() throws Exception { + String clientMachine = "client.foo.com"; + short[][] testSuite = { + {3, 2}, {3, 7}, {3, 8}, {3, 10}, {9, 1}, {10, 1}, {10, 6}, {11, 6}, + {11, 9} + }; + // Test 5 files + int fileCount = 0; + for (int i = 0; i < 5; i++) { + for (short[] testCase : testSuite) { + short replication = testCase[0]; + short additionalReplication = testCase[1]; + String src = "/testfile" + (fileCount++); + // Create the file with client machine + HdfsFileStatus fileStatus = namesystem.startFile(src, perm, + clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, + replication, DEFAULT_BLOCK_SIZE, null, false); + + //test chooseTarget for new file + LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, + null, null, fileStatus.getFileId(), null); + doTestLocatedBlock(replication, locatedBlock); + + //test chooseTarget for existing file. + LocatedBlock additionalLocatedBlock = + nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(), + locatedBlock.getBlock(), locatedBlock.getLocations(), + locatedBlock.getStorageIDs(), new DatanodeInfo[0], + additionalReplication, clientMachine); + doTestLocatedBlock(replication + additionalReplication, additionalLocatedBlock); + } + } + } + + /** + * Test more randomly. So it covers some special cases. + * Like when some racks already have 2 replicas, while some racks have none, + * we should choose the racks that have none. + */ + private void doTestChooseTargetSpecialCase() throws Exception { + String clientMachine = "client.foo.com"; + // Test 5 files + String src = "/testfile_1_"; + // Create the file with client machine + HdfsFileStatus fileStatus = namesystem.startFile(src, perm, + clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, + (short) 20, DEFAULT_BLOCK_SIZE, null, false); + + //test chooseTarget for new file + LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, + null, null, fileStatus.getFileId(), null); + doTestLocatedBlock(20, locatedBlock); + + DatanodeInfo[] locs = locatedBlock.getLocations(); + String[] storageIDs = locatedBlock.getStorageIDs(); + + for (int time = 0; time < 5; time++) { + shuffle(locs, storageIDs); + for (int i = 1; i < locs.length; i++) { + DatanodeInfo[] partLocs = new DatanodeInfo[i]; + String[] partStorageIDs = new String[i]; + System.arraycopy(locs, 0, partLocs, 0, i); + System.arraycopy(storageIDs, 0, partStorageIDs, 0, i); + for (int j = 1; j < 20 - i; j++) { + LocatedBlock additionalLocatedBlock = + nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(), + locatedBlock.getBlock(), partLocs, + partStorageIDs, new DatanodeInfo[0], + j, clientMachine); + doTestLocatedBlock(i + j, additionalLocatedBlock); + } + } + } + } + + private void shuffle(DatanodeInfo[] locs, String[] storageIDs) { + int length = locs.length; + Object[][] pairs = new Object[length][]; + for (int i = 0; i < length; i++) { + pairs[i] = new Object[]{locs[i], storageIDs[i]}; + } + DFSUtil.shuffle(pairs); + for (int i = 0; i < length; i++) { + locs[i] = (DatanodeInfo) pairs[i][0]; + storageIDs[i] = (String) pairs[i][1]; + } + } + + private void doTestLocatedBlock(int replication, LocatedBlock locatedBlock) { + assertEquals(replication, locatedBlock.getLocations().length); + + HashMap racksCount = new HashMap(); + for (DatanodeInfo node : + locatedBlock.getLocations()) { + addToRacksCount(node.getNetworkLocation(), racksCount); + } + + int minCount = Integer.MAX_VALUE; + int maxCount = Integer.MIN_VALUE; + for (Integer rackCount : racksCount.values()) { + minCount = Math.min(minCount, rackCount); + maxCount = Math.max(maxCount, rackCount); + } + assertTrue(maxCount - minCount <= 1); + } + + private void addToRacksCount(String rack, HashMap racksCount) { + Integer count = racksCount.get(rack); + if (count == null) { + racksCount.put(rack, 1); + } else { + racksCount.put(rack, count + 1); + } + } +} diff --git hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java deleted file mode 100644 index ca9da77..0000000 --- hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; -import org.apache.hadoop.net.StaticMapping; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.*; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestBlockPlacementPolicyRackFaultTolerant { - - private static final int DEFAULT_BLOCK_SIZE = 1024; - private MiniDFSCluster cluster = null; - private NamenodeProtocols nameNodeRpc = null; - private FSNamesystem namesystem = null; - private PermissionStatus perm = null; - - @Before - public void setup() throws IOException { - StaticMapping.resetMap(); - Configuration conf = new HdfsConfiguration(); - final ArrayList rackList = new ArrayList(); - final ArrayList hostList = new ArrayList(); - for (int i = 0; i < 10; i++) { - for (int j = 0; j < 2; j++) { - rackList.add("/rack" + i); - hostList.add("/host" + i + j); - } - } - conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, - BlockPlacementPolicyRackFaultTolerant.class, - BlockPlacementPolicy.class); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); - conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2); - cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(hostList.size()) - .racks(rackList.toArray(new String[rackList.size()])) - .hosts(hostList.toArray(new String[hostList.size()])) - .build(); - cluster.waitActive(); - nameNodeRpc = cluster.getNameNodeRpc(); - namesystem = cluster.getNamesystem(); - perm = new PermissionStatus("TestBlockPlacementPolicyEC", null, - FsPermission.getDefault()); - } - - @After - public void teardown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - @Test - public void testChooseTarget() throws Exception { - doTestChooseTargetNormalCase(); - doTestChooseTargetSpecialCase(); - } - - private void doTestChooseTargetNormalCase() throws Exception { - String clientMachine = "client.foo.com"; - short[][] testSuite = { - {3, 2}, {3, 7}, {3, 8}, {3, 10}, {9, 1}, {10, 1}, {10, 6}, {11, 6}, - {11, 9} - }; - // Test 5 files - int fileCount = 0; - for (int i = 0; i < 5; i++) { - for (short[] testCase : testSuite) { - short replication = testCase[0]; - short additionalReplication = testCase[1]; - String src = "/testfile" + (fileCount++); - // Create the file with client machine - HdfsFileStatus fileStatus = namesystem.startFile(src, perm, - clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, - replication, DEFAULT_BLOCK_SIZE, null, false); - - //test chooseTarget for new file - LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); - doTestLocatedBlock(replication, locatedBlock); - - //test chooseTarget for existing file. - LocatedBlock additionalLocatedBlock = - nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(), - locatedBlock.getBlock(), locatedBlock.getLocations(), - locatedBlock.getStorageIDs(), new DatanodeInfo[0], - additionalReplication, clientMachine); - doTestLocatedBlock(replication + additionalReplication, additionalLocatedBlock); - } - } - } - - /** - * Test more randomly. So it covers some special cases. - * Like when some racks already have 2 replicas, while some racks have none, - * we should choose the racks that have none. - */ - private void doTestChooseTargetSpecialCase() throws Exception { - String clientMachine = "client.foo.com"; - // Test 5 files - String src = "/testfile_1_"; - // Create the file with client machine - HdfsFileStatus fileStatus = namesystem.startFile(src, perm, - clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, - (short) 20, DEFAULT_BLOCK_SIZE, null, false); - - //test chooseTarget for new file - LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); - doTestLocatedBlock(20, locatedBlock); - - DatanodeInfo[] locs = locatedBlock.getLocations(); - String[] storageIDs = locatedBlock.getStorageIDs(); - - for (int time = 0; time < 5; time++) { - shuffle(locs, storageIDs); - for (int i = 1; i < locs.length; i++) { - DatanodeInfo[] partLocs = new DatanodeInfo[i]; - String[] partStorageIDs = new String[i]; - System.arraycopy(locs, 0, partLocs, 0, i); - System.arraycopy(storageIDs, 0, partStorageIDs, 0, i); - for (int j = 1; j < 20 - i; j++) { - LocatedBlock additionalLocatedBlock = - nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(), - locatedBlock.getBlock(), partLocs, - partStorageIDs, new DatanodeInfo[0], - j, clientMachine); - doTestLocatedBlock(i + j, additionalLocatedBlock); - } - } - } - } - - private void shuffle(DatanodeInfo[] locs, String[] storageIDs) { - int length = locs.length; - Object[][] pairs = new Object[length][]; - for (int i = 0; i < length; i++) { - pairs[i] = new Object[]{locs[i], storageIDs[i]}; - } - DFSUtil.shuffle(pairs); - for (int i = 0; i < length; i++) { - locs[i] = (DatanodeInfo) pairs[i][0]; - storageIDs[i] = (String) pairs[i][1]; - } - } - - private void doTestLocatedBlock(int replication, LocatedBlock locatedBlock) { - assertEquals(replication, locatedBlock.getLocations().length); - - HashMap racksCount = new HashMap(); - for (DatanodeInfo node : - locatedBlock.getLocations()) { - addToRacksCount(node.getNetworkLocation(), racksCount); - } - - int minCount = Integer.MAX_VALUE; - int maxCount = Integer.MIN_VALUE; - for (Integer rackCount : racksCount.values()) { - minCount = Math.min(minCount, rackCount); - maxCount = Math.max(maxCount, rackCount); - } - assertTrue(maxCount - minCount <= 1); - } - - private void addToRacksCount(String rack, HashMap racksCount) { - Integer count = racksCount.get(rack); - if (count == null) { - racksCount.put(rack, 1); - } else { - racksCount.put(rack, count + 1); - } - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index f74106a..9594285 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -23,6 +23,9 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,36 +47,64 @@ private static final Log LOG = LogFactory.getLog( FSParentQueue.class.getName()); - private final List childQueues = - new ArrayList(); + private final List childQueues = new ArrayList<>(); private Resource demand = Resources.createResource(0); private int runnableApps; - + + private ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private Lock readLock = rwLock.readLock(); + private Lock writeLock = rwLock.writeLock(); + public FSParentQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); } public void addChildQueue(FSQueue child) { - childQueues.add(child); + writeLock.lock(); + try { + childQueues.add(child); + } finally { + writeLock.unlock(); + } + } + + public void removeChildQueue(FSQueue child) { + writeLock.lock(); + try { + childQueues.remove(child); + } finally { + writeLock.unlock(); + } } @Override public void recomputeShares() { - policy.computeShares(childQueues, getFairShare()); - for (FSQueue childQueue : childQueues) { - childQueue.getMetrics().setFairShare(childQueue.getFairShare()); - childQueue.recomputeShares(); + readLock.lock(); + try { + policy.computeShares(childQueues, getFairShare()); + for (FSQueue childQueue : childQueues) { + childQueue.getMetrics().setFairShare(childQueue.getFairShare()); + childQueue.recomputeShares(); + } + } finally { + readLock.unlock(); } } public void recomputeSteadyShares() { - policy.computeSteadyShares(childQueues, getSteadyFairShare()); - for (FSQueue childQueue : childQueues) { - childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare()); - if (childQueue instanceof FSParentQueue) { - ((FSParentQueue) childQueue).recomputeSteadyShares(); + readLock.lock(); + try { + policy.computeSteadyShares(childQueues, getSteadyFairShare()); + for (FSQueue childQueue : childQueues) { + childQueue.getMetrics() + .setSteadyFairShare(childQueue.getSteadyFairShare()); + if (childQueue instanceof FSParentQueue) { + ((FSParentQueue) childQueue).recomputeSteadyShares(); + } } + } finally { + readLock.unlock(); } } @@ -81,21 +112,37 @@ public void recomputeSteadyShares() { public void updatePreemptionVariables() { super.updatePreemptionVariables(); // For child queues - for (FSQueue childQueue : childQueues) { - childQueue.updatePreemptionVariables(); + + readLock.lock(); + try { + for (FSQueue childQueue : childQueues) { + childQueue.updatePreemptionVariables(); + } + } finally { + readLock.unlock(); } } @Override public Resource getDemand() { - return demand; + readLock.lock(); + try { + return Resource.newInstance(demand.getMemory(), demand.getVirtualCores()); + } finally { + readLock.unlock(); + } } @Override public Resource getResourceUsage() { Resource usage = Resources.createResource(0); - for (FSQueue child : childQueues) { - Resources.addTo(usage, child.getResourceUsage()); + readLock.lock(); + try { + for (FSQueue child : childQueues) { + Resources.addTo(usage, child.getResourceUsage()); + } + } finally { + readLock.unlock(); } return usage; } @@ -106,20 +153,25 @@ public void updateDemand() { // Limit demand to maxResources Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); - demand = Resources.createResource(0); - for (FSQueue childQueue : childQueues) { - childQueue.updateDemand(); - Resource toAdd = childQueue.getDemand(); - if (LOG.isDebugEnabled()) { - LOG.debug("Counting resource from " + childQueue.getName() + " " + - toAdd + "; Total resource consumption for " + getName() + - " now " + demand); - } - demand = Resources.add(demand, toAdd); - demand = Resources.componentwiseMin(demand, maxRes); - if (Resources.equals(demand, maxRes)) { - break; + writeLock.lock(); + try { + demand = Resources.createResource(0); + for (FSQueue childQueue : childQueues) { + childQueue.updateDemand(); + Resource toAdd = childQueue.getDemand(); + if (LOG.isDebugEnabled()) { + LOG.debug("Counting resource from " + childQueue.getName() + " " + + toAdd + "; Total resource consumption for " + getName() + + " now " + demand); + } + demand = Resources.add(demand, toAdd); + demand = Resources.componentwiseMin(demand, maxRes); + if (Resources.equals(demand, maxRes)) { + break; + } } + } finally { + writeLock.unlock(); } if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand + @@ -127,33 +179,31 @@ public void updateDemand() { } } - private synchronized QueueUserACLInfo getUserAclInfo( - UserGroupInformation user) { - QueueUserACLInfo userAclInfo = - recordFactory.newRecordInstance(QueueUserACLInfo.class); - List operations = new ArrayList(); + private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) { + List operations = new ArrayList<>(); for (QueueACL operation : QueueACL.values()) { if (hasAccess(operation, user)) { operations.add(operation); } } - - userAclInfo.setQueueName(getQueueName()); - userAclInfo.setUserAcls(operations); - return userAclInfo; + return QueueUserACLInfo.newInstance(getQueueName(), operations); } @Override - public synchronized List getQueueUserAclInfo( - UserGroupInformation user) { + public List getQueueUserAclInfo(UserGroupInformation user) { List userAcls = new ArrayList(); // Add queue acls userAcls.add(getUserAclInfo(user)); // Add children queue acls - for (FSQueue child : childQueues) { - userAcls.addAll(child.getQueueUserAclInfo(user)); + readLock.lock(); + try { + for (FSQueue child : childQueues) { + userAcls.addAll(child.getQueueUserAclInfo(user)); + } + } finally { + readLock.unlock(); } return userAcls; @@ -168,12 +218,23 @@ public Resource assignContainer(FSSchedulerNode node) { return assigned; } - Collections.sort(childQueues, policy.getComparator()); - for (FSQueue child : childQueues) { - assigned = child.assignContainer(node); - if (!Resources.equals(assigned, Resources.none())) { - break; + writeLock.lock(); + try { + Collections.sort(childQueues, policy.getComparator()); + } finally { + writeLock.unlock(); + } + + readLock.lock(); + try { + for (FSQueue child : childQueues) { + assigned = child.assignContainer(node); + if (!Resources.equals(assigned, Resources.none())) { + break; + } } + } finally { + readLock.unlock(); } return assigned; } @@ -182,14 +243,23 @@ public Resource assignContainer(FSSchedulerNode node) { public RMContainer preemptContainer() { RMContainer toBePreempted = null; - // Find the childQueue which is most over fair share + /* + * Iterate through the queues and pick the one most over fairshare. + * TODO: If the queues are mostly sorted, we could just pick the last one. + */ FSQueue candidateQueue = null; Comparator comparator = policy.getComparator(); - for (FSQueue queue : childQueues) { - if (candidateQueue == null || - comparator.compare(queue, candidateQueue) > 0) { - candidateQueue = queue; + + readLock.lock(); + try { + for (FSQueue queue : childQueues) { + if (candidateQueue == null || + comparator.compare(queue, candidateQueue) > 0) { + candidateQueue = queue; + } } + } finally { + readLock.unlock(); } // Let the selected queue choose which of its container to preempt @@ -201,7 +271,12 @@ public RMContainer preemptContainer() { @Override public List getChildQueues() { - return childQueues; + readLock.lock(); + try { + return Collections.unmodifiableList(childQueues); + } finally { + readLock.unlock(); + } } @Override @@ -218,23 +293,43 @@ public void setPolicy(SchedulingPolicy policy) } public void incrementRunnableApps() { - runnableApps++; + writeLock.lock(); + try { + runnableApps++; + } finally { + writeLock.unlock(); + } } public void decrementRunnableApps() { - runnableApps--; + writeLock.lock(); + try { + runnableApps--; + } finally { + writeLock.unlock(); + } } @Override public int getNumRunnableApps() { - return runnableApps; + readLock.lock(); + try { + return runnableApps; + } finally { + readLock.unlock(); + } } @Override public void collectSchedulerApplications( Collection apps) { - for (FSQueue childQueue : childQueues) { - childQueue.collectSchedulerApplications(apps); + readLock.lock(); + try { + for (FSQueue childQueue : childQueues) { + childQueue.collectSchedulerApplications(apps); + } + } finally { + readLock.unlock(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 64442ab..6556717 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -304,7 +304,8 @@ private void removeQueue(FSQueue queue) { } } queues.remove(queue.getName()); - queue.getParent().getChildQueues().remove(queue); + FSParentQueue parent = queue.getParent(); + parent.removeChildQueue(queue); } /**