From 9e5f631277c9dbf86a34c10e9af222fc35ed028a Mon Sep 17 00:00:00 2001 From: stack Date: Mon, 7 Mar 2011 11:03:23 -0800 Subject: [PATCH] Added in the hdfs-1520,1555,1554 from tip of the branch-0.20-append to get us the new NN recoverLease API --- CHANGES.txt | 47 +++++ src/docs/cn/src/documentation/sitemap.xmap | 2 +- src/docs/cn/uming.conf | 2 +- src/hdfs/org/apache/hadoop/hdfs/DFSClient.java | 17 ++ .../org/apache/hadoop/hdfs/DFSClient.java.orig | 107 +++++++++-- .../apache/hadoop/hdfs/DistributedFileSystem.java | 11 + .../hadoop/hdfs/protocol/ClientProtocol.java | 14 ++- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../hadoop/hdfs/server/namenode/FSDirectory.java | 17 ++ .../hadoop/hdfs/server/namenode/FSNamesystem.java | 160 +++++++++++----- .../hadoop/hdfs/server/namenode/LeaseManager.java | 6 + .../hadoop/hdfs/server/namenode/NameNode.java | 10 +- .../hdfs/server/protocol/DatanodeProtocol.java | 16 +- .../apache/hadoop/hdfs/TestDFSClientRetries.java | 3 + .../org/apache/hadoop/hdfs/TestLeaseRecovery2.java | 207 +++++++++++++------- .../hdfs/server/namenode/NameNodeAdapter.java | 2 +- 16 files changed, 470 insertions(+), 153 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index b77d5f1..e1fb51d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,51 @@ Hadoop Change Log + SU + (Latest to oldest) + + commit df0d79cc2b09438c079fdf10b913936492117917 + Author: Hairong Kuang + Date: Mon Jan 10 19:01:36 2011 +0000 + + HDFS-1554. New semantics for recoverLease. Contributed by Hairong Kuang. + + + git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-append@1057313 13f79535-47bb-0310-9956-ffa450edef68 + + commit e499be8c74a871ef06aa8a74c621e68d50167d3d + Author: Hairong Kuang + Date: Fri Jan 7 20:25:23 2011 +0000 + + HDFS-1555. Disallow pipelien recovery if a file is already being lease recovered. Contributed by Hairong Kuang. + + + git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-append@1056491 13f79535-47bb-0310-9956-ffa450edef68 + + commit 966ef38cb13d75ada40de2b9951496ddcb6918a4 + Author: Hairong Kuang + Date: Fri Jan 7 20:19:40 2011 +0000 + + Revert the change made to HDFS-1555: merge -c -1056483 https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-append + + git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-append@1056489 13f79535-47bb-0310-9956-ffa450edef68 + + commit f9436bf6bd2f5bc4f8cebe2781c5ce628db31239 + Author: Hairong Kuang + Date: Fri Jan 7 20:11:38 2011 +0000 + + HDFS-1555. Disallow pipeline recovery if a file is already being lease recovered. Contributed by Hairong Kuang. + + + git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-append@1056483 13f79535-47bb-0310-9956-ffa450edef68 + + commit 9a3094f3a085817d377ce69fb8eb002f966f992b + Author: Dhruba Borthakur + Date: Fri Dec 10 05:38:06 2010 +0000 + + HDFS-1520. Lightweight NameNode operation recoverLease to trigger + lease recovery. (Hairong Kuang via dhruba) + + Release 0.20.3 - Unreleased Release 0.20.2 - 2010-2-19 @@ -65,6 +111,7 @@ Release 0.20.2 - 2010-2-19 HADOOP-6576. Fix streaming test failures on 0.20. (Todd Lipcon via cdouglas) + IMPROVEMENTS HADOOP-5611. Fix C++ libraries to build on Debian Lenny. (Todd Lipcon diff --git a/src/docs/cn/src/documentation/sitemap.xmap b/src/docs/cn/src/documentation/sitemap.xmap index 5aad89d..05e991c 100644 --- a/src/docs/cn/src/documentation/sitemap.xmap +++ b/src/docs/cn/src/documentation/sitemap.xmap @@ -21,7 +21,7 @@ - + diff --git a/src/docs/cn/uming.conf b/src/docs/cn/uming.conf index 0e124df..78d85f0 100644 --- a/src/docs/cn/uming.conf +++ b/src/docs/cn/uming.conf @@ -1,7 +1,7 @@ - + diff --git a/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java b/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java index 227502e..c975b57 100644 --- a/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java +++ b/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java @@ -507,6 +507,23 @@ public class DFSClient implements FSConstants, java.io.Closeable { } /** + * Recover a file's lease + * @param src a file's path + * @return true if the file is already closed + * @throws IOException + */ + boolean recoverLease(String src) throws IOException { + checkOpen(); + + try { + return namenode.recoverLease(src, clientName); + } catch (RemoteException re) { + throw re.unwrapRemoteException(FileNotFoundException.class, + AccessControlException.class); + } + } + + /** * Append to an existing HDFS file. * * @param src file name diff --git a/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java.orig b/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java.orig index 1fe6e22..5a984ce 100644 --- a/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java.orig +++ b/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java.orig @@ -83,6 +83,8 @@ public class DFSClient implements FSConstants, java.io.Closeable { final int writePacketSize; private final FileSystem.Statistics stats; private int maxBlockAcquireFailures; + private boolean shortCircuitLocalReads = false; + private final InetAddress localHost; /** * We assume we're talking to another CDH server, which supports @@ -192,6 +194,7 @@ public class DFSClient implements FSConstants, java.io.Closeable { // dfs.write.packet.size is an internal config variable this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf); + this.localHost = InetAddress.getLocalHost(); try { this.ugi = UnixUserGroupInformation.login(conf, true); @@ -219,6 +222,8 @@ public class DFSClient implements FSConstants, java.io.Closeable { "Expecting exactly one of nameNodeAddr and rpcNamenode being null: " + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode); } + // read directly from the block file if configured. + this.shortCircuitLocalReads = conf.getBoolean("dfs.read.shortcircuit", true); } static int getMaxBlockAcquireFailures(Configuration conf) { @@ -502,6 +507,22 @@ public class DFSClient implements FSConstants, java.io.Closeable { } /** + * Recover a file's lease + * @param src a file's path + * @throws IOException + */ + void recoverLease(String src) throws IOException { + checkOpen(); + + try { + namenode.recoverLease(src, clientName); + } catch (RemoteException re) { + throw re.unwrapRemoteException(FileNotFoundException.class, + AccessControlException.class); + } + } + + /** * Append to an existing HDFS file. * * @param src file name @@ -1153,7 +1174,7 @@ public class DFSClient implements FSConstants, java.io.Closeable { int nRead = super.read(buf, off, len); // if gotEOS was set in the previous read and checksum is enabled : - if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) { + if (dnSock != null && gotEOS && !eosBefore && nRead >= 0 && needChecksum()) { //checksum is verified and there are no errors. checksumOk(dnSock); } @@ -1331,6 +1352,13 @@ public class DFSClient implements FSConstants, java.io.Closeable { checksumSize = this.checksum.getChecksumSize(); } + /** + * Public constructor + */ + BlockReader(String file, int numRetries) { + super(new Path(file), numRetries); + } + public static BlockReader newBlockReader(Socket sock, String file, long blockId, long genStamp, long startOffset, long len, int bufferSize) throws IOException { return newBlockReader(sock, file, blockId, genStamp, startOffset, len, bufferSize, @@ -1675,11 +1703,35 @@ public class DFSClient implements FSConstants, java.io.Closeable { chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; + // try reading the block locally. if this fails, then go via + // the datanode + Block blk = targetBlock.getBlock(); + try { + if (LOG.isDebugEnabled()) { + LOG.warn("blockSeekTo shortCircuitLocalReads " + shortCircuitLocalReads + + " localhost " + localHost + + " targetAddr " + targetAddr); + } + if (shortCircuitLocalReads && localHost != null && + (targetAddr.equals(localHost) || + targetAddr.getHostName().startsWith("localhost"))) { + blockReader = BlockReaderLocal.newBlockReader(conf, src, blk, + chosenNode, + offsetIntoBlock, + blk.getNumBytes() - offsetIntoBlock); + return chosenNode; + } + } catch (IOException ex) { + LOG.info("Failed to read block " + targetBlock.getBlock() + + " on local machine " + localHost + + ". Try via the datanode on " + targetAddr + ":" + + StringUtils.stringifyException(ex)); + } + try { s = socketFactory.createSocket(); NetUtils.connect(s, targetAddr, socketTimeout); s.setSoTimeout(socketTimeout); - Block blk = targetBlock.getBlock(); blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), blk.getGenerationStamp(), @@ -1884,25 +1936,43 @@ public class DFSClient implements FSConstants, java.io.Closeable { DatanodeInfo chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; BlockReader reader = null; - - try { - dn = socketFactory.createSocket(); - NetUtils.connect(dn, targetAddr, socketTimeout); - dn.setSoTimeout(socketTimeout); - - int len = (int) (end - start + 1); - - reader = BlockReader.newBlockReader(dn, src, + int len = (int) (end - start + 1); + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("fetchBlockByteRange shortCircuitLocalReads " + + shortCircuitLocalReads + + " localhst " + localHost + + " targetAddr " + targetAddr + + " getHostName: " + targetAddr.getHostName()); + } + // first try reading the block locally. + if (shortCircuitLocalReads && localHost != null && + (targetAddr.equals(localHost) || + targetAddr.getHostName().startsWith("localhost"))) { + LOG.debug("localBlock reader: " + block); + reader = BlockReaderLocal.newBlockReader(conf, src, + block.getBlock(), + chosenNode, + start, + len); + } else { + // go to the datanode + dn = socketFactory.createSocket(); + NetUtils.connect(dn, targetAddr, socketTimeout); + dn.setSoTimeout(socketTimeout); + reader = BlockReader.newBlockReader(dn, src, block.getBlock().getBlockId(), block.getBlock().getGenerationStamp(), start, len, buffersize, verifyChecksum, clientName); - int nread = reader.readAll(buf, offset, len); - if (nread != len) { - throw new IOException("truncated return from reader.read(): " + - "excpected " + len + ", got " + nread); - } - return; + } + int nread = reader.readAll(buf, offset, len); + if (nread != len) { + throw new IOException("truncated return from reader.read(): " + + "excpected " + len + ", got " + nread); + } + return; } catch (ChecksumException e) { LOG.warn("fetchBlockByteRange(). Got a checksum exception for " + src + " at " + block.getBlock() + ":" + @@ -2969,7 +3039,8 @@ public class DFSClient implements FSConstants, java.io.Closeable { } catch (IOException ie) { - LOG.info("Exception in createBlockOutputStream " + ie); + LOG.info("Exception in createBlockOutputStream " + nodes[0].getName() + " " + + ie); IOUtils.closeSocket(s); // find the datanode that matches diff --git a/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java b/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java index 129fbce..48983c6 100644 --- a/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -179,6 +179,17 @@ public class DistributedFileSystem extends FileSystem { dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics)); } + /** + * Start the lease recovery of a file + * + * @param f a file + * @return true if the file is already closed + * @throws IOException if an error occurs + */ + public boolean recoverLease(Path f) throws IOException { + return dfs.recoverLease(getPathName(f)); + } + /** This optional operation is not yet supported. */ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { diff --git a/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index f6d4fcc..0aa8020 100644 --- a/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -40,9 +40,9 @@ public interface ClientProtocol extends VersionedProtocol { * Compared to the previous version the following changes have been introduced: * (Only the latest change is reflected. * The log of historical changes can be retrieved from the svn). - * 41: saveNamespace introduced. + * 43: recoverLease return if the file is closed or not */ - public static final long versionID = 41L; + public static final long versionID = 43L; /////////////////////////////////////// // File contents @@ -122,6 +122,16 @@ public interface ClientProtocol extends VersionedProtocol { * @throws IOException if other errors occur. */ public LocatedBlock append(String src, String clientName) throws IOException; + + /** + * Start lease recovery + * + * @param src path of the file to start lease recovery + * @param clientName name of the current client + * @return true if the file is already closed + * @throws IOException + */ + public boolean recoverLease(String src, String clientName) throws IOException; /** * Set replication for an existing file. diff --git a/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java index a1eaf45..24ffc57 100644 --- a/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1699,7 +1699,7 @@ public class DataNode extends Configured List successList = new ArrayList(); - long generationstamp = namenode.nextGenerationStamp(block); + long generationstamp = namenode.nextGenerationStamp(block, closeFile); Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp); for(BlockRecord r : syncList) { diff --git a/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index aeb40c4..f0a7739 100644 --- a/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -898,6 +898,23 @@ class FSDirectory implements FSConstants, Closeable { return fullPathName.toString(); } + /** Return the full path name of the specified inode */ + static String getFullPathName(INode inode) { + // calculate the depth of this inode from root + int depth = 0; + for (INode i = inode; i != null; i = i.parent) { + depth++; + } + INode[] inodes = new INode[depth]; + + // fill up the inodes in the path from this inode to root + for (int i = 0; i < depth; i++) { + inodes[depth-i-1] = inode; + inode = inode.parent; + } + return getFullPathName(inodes, depth-1); + } + /** * Create a directory * If ancestor directories do not exist, automatically create them. diff --git a/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 9038b63..1314e3f 100644 --- a/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1051,51 +1051,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean { try { INode myFile = dir.getFileINode(src); - if (myFile != null && myFile.isUnderConstruction()) { - INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile; - // - // If the file is under construction , then it must be in our - // leases. Find the appropriate lease record. - // - Lease lease = leaseManager.getLease(holder); - // - // We found the lease for this file. And surprisingly the original - // holder is trying to recreate this file. This should never occur. - // - if (lease != null) { - Lease leaseFile = leaseManager.getLeaseByPath(src); - if (leaseFile != null && leaseFile.equals(lease)) { - throw new AlreadyBeingCreatedException( - "failed to create file " + src + " for " + holder + - " on client " + clientMachine + - " because current leaseholder is trying to recreate file."); - } - } - // - // Find the original holder. - // - lease = leaseManager.getLease(pendingFile.clientName); - if (lease == null) { - throw new AlreadyBeingCreatedException( - "failed to create file " + src + " for " + holder + - " on client " + clientMachine + - " because pendingCreates is non-null but no leases found."); - } - // - // If the original holder has not renewed in the last SOFTLIMIT - // period, then start lease recovery. - // - if (lease.expiredSoftLimit()) { - LOG.info("startFile: recover lease " + lease + ", src=" + src + - " from client " + pendingFile.clientName); - internalReleaseLease(lease, src); - } - throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder + - " on client " + clientMachine + - ", because this file is already being created by " + - pendingFile.getClientName() + - " on " + pendingFile.getClientMachine()); - } + recoverLeaseInternal(myFile, src, holder, clientMachine, false); try { verifyReplication(src, replication, clientMachine); @@ -1170,6 +1126,103 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean { } /** + * Recover lease; + * Immediately revoke the lease of the current lease holder and start lease + * recovery so that the file can be forced to be closed. + * + * @param src the path of the file to start lease recovery + * @param holder the lease holder's name + * @param clientMachine the client machine's name + * @return true if the file is already closed + * @throws IOException + */ + synchronized boolean recoverLease(String src, String holder, String clientMachine) + throws IOException { + if (isInSafeMode()) { + throw new SafeModeException( + "Cannot recover the lease of " + src, safeMode); + } + if (!DFSUtil.isValidName(src)) { + throw new IOException("Invalid file name: " + src); + } + + INode inode = dir.getFileINode(src); + if (inode == null) { + throw new FileNotFoundException("File not found " + src); + } + + if (!inode.isUnderConstruction()) { + return true; + } + if (isPermissionEnabled) { + checkPathAccess(src, FsAction.WRITE); + } + + recoverLeaseInternal(inode, src, holder, clientMachine, true); + return false; + } + + private void recoverLeaseInternal(INode fileInode, + String src, String holder, String clientMachine, boolean force) + throws IOException { + if (fileInode != null && fileInode.isUnderConstruction()) { + INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode; + // + // If the file is under construction , then it must be in our + // leases. Find the appropriate lease record. + // + Lease lease = leaseManager.getLease(holder); + // + // We found the lease for this file. And surprisingly the original + // holder is trying to recreate this file. This should never occur. + // + if (!force && lease != null) { + Lease leaseFile = leaseManager.getLeaseByPath(src); + if (leaseFile != null && leaseFile.equals(lease)) { + throw new AlreadyBeingCreatedException( + "failed to create file " + src + " for " + holder + + " on client " + clientMachine + + " because current leaseholder is trying to recreate file."); + } + } + // + // Find the original holder. + // + lease = leaseManager.getLease(pendingFile.clientName); + if (lease == null) { + throw new AlreadyBeingCreatedException( + "failed to create file " + src + " for " + holder + + " on client " + clientMachine + + " because pendingCreates is non-null but no leases found."); + } + if (force) { + // close now: no need to wait for soft lease expiration and + // close only the file src + LOG.info("recoverLease: recover lease " + lease + ", src=" + src + + " from client " + pendingFile.clientName); + internalReleaseLeaseOne(lease, src); + } else { + // + // If the original holder has not renewed in the last SOFTLIMIT + // period, then start lease recovery. + // + if (lease.expiredSoftLimit()) { + LOG.info("startFile: recover lease " + lease + ", src=" + src + + " from client " + pendingFile.clientName); + internalReleaseLease(lease, src); + } + throw new AlreadyBeingCreatedException( + "failed to create file " + src + " for " + holder + + " on client " + clientMachine + + ", because this file is already being created by " + + pendingFile.getClientName() + + " on " + pendingFile.getClientMachine()); + } + } + + } + + /** * Append to an existing file in the namespace. */ LocatedBlock appendFile(String src, String holder, String clientMachine @@ -4896,8 +4949,12 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean { /** * Verifies that the block is associated with a file that has a lease. * Increments, logs and then returns the stamp - */ - synchronized long nextGenerationStampForBlock(Block block) throws IOException { + * + * @param block block + * @param fromNN if it is for lease recovery initiated by NameNode + * @return a new generation stamp + */ + synchronized long nextGenerationStampForBlock(Block block, boolean fromNN) throws IOException { if (isInSafeMode()) { throw new SafeModeException("Cannot get nextGenStamp for " + block, safeMode); } @@ -4913,6 +4970,15 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean { LOG.info(msg); throw new IOException(msg); } + // Disallow client-initiated recovery once + // NameNode initiated lease recovery starts + if (!fromNN && HdfsConstants.NN_RECOVERY_LEASEHOLDER.equals( + leaseManager.getLeaseByPath(FSDirectory.getFullPathName(fileINode)).getHolder())) { + String msg = block + + "is being recovered by NameNode, ignoring the request from a client"; + LOG.info(msg); + throw new IOException(msg); + } if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) { String msg = block + " is already being recovered, ignoring this request."; LOG.info(msg); diff --git a/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index 61e22eb..1be5453 100644 --- a/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -201,6 +201,12 @@ public class LeaseManager { this.holder = holder; renew(); } + + /** Get the holder of the lease */ + public String getHolder() { + return holder; + } + /** Only LeaseManager object can renew a lease */ private void renew() { this.lastUpdate = FSNamesystem.now(); diff --git a/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 24c2879..196a702 100644 --- a/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -425,6 +425,12 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, } /** {@inheritDoc} */ + public boolean recoverLease(String src, String clientName) throws IOException { + String clientMachine = getClientMachine(); + return namesystem.recoverLease(src, clientName, clientMachine); + } + + /** {@inheritDoc} */ public boolean setReplication(String src, short replication ) throws IOException { @@ -517,8 +523,8 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, } /** {@inheritDoc} */ - public long nextGenerationStamp(Block block) throws IOException{ - return namesystem.nextGenerationStampForBlock(block); + public long nextGenerationStamp(Block block, boolean fromNN) throws IOException{ + return namesystem.nextGenerationStampForBlock(block, fromNN); } /** {@inheritDoc} */ diff --git a/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 0af90e6..985c2ee 100644 --- a/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -35,10 +35,10 @@ import org.apache.hadoop.ipc.VersionedProtocol; **********************************************************************/ public interface DatanodeProtocol extends VersionedProtocol { /** - * 19: SendHeartbeat returns an array of DatanodeCommand objects - * in stead of a DatanodeCommand object. + * 20: nextGenerationStamp has a new parameter indicating if it is for + * NameNode initiated lease recovery or not */ - public static final long versionID = 19L; + public static final long versionID = 20L; // error code final static int NOTIFY = 0; @@ -142,10 +142,14 @@ public interface DatanodeProtocol extends VersionedProtocol { public void reportBadBlocks(LocatedBlock[] blocks) throws IOException; /** - * @return the next GenerationStamp to be associated with the specified - * block. + * Get the next GenerationStamp to be associated with the specified + * block. + * + * @param block block + * @param fromNN if it is for lease recovery initiated by NameNode + * @return a new generation stamp */ - public long nextGenerationStamp(Block block) throws IOException; + public long nextGenerationStamp(Block block, boolean fromNN) throws IOException; /** * Commit block synchronization in lease recovery diff --git a/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 235f803..f81ec8c 100644 --- a/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -225,6 +225,9 @@ public class TestDFSClientRetries extends TestCase { public void setTimes(String src, long mtime, long atime) throws IOException {} + @Override + public boolean recoverLease(String src, String clientName) throws IOException {return true;} + } public void testNotYetReplicatedErrors() throws IOException diff --git a/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java b/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java index 3fd6cdf..d12c46a 100644 --- a/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java +++ b/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java @@ -43,14 +43,13 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase { static final long BLOCK_SIZE = 1024; static final int FILE_SIZE = 1024*16; static final short REPLICATION_NUM = (short)3; - static byte[] buffer = new byte[FILE_SIZE]; + private static byte[] buffer = new byte[FILE_SIZE]; + private final Configuration conf = new Configuration(); + private final int bufferSize = conf.getInt("io.file.buffer.size", 4096); public void testBlockSynchronization() throws Exception { final long softLease = 1000; final long hardLease = 60 * 60 *1000; - final short repl = 3; - final Configuration conf = new Configuration(); - final int bufferSize = conf.getInt("io.file.buffer.size", 4096); conf.setLong("dfs.block.size", BLOCK_SIZE); conf.setInt("dfs.heartbeat.interval", 1); // conf.setInt("io.bytes.per.checksum", 16); @@ -64,88 +63,148 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase { //create a file DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); - // create a random file name - String filestr = "/foo" + AppendTestUtil.nextInt(); - System.out.println("filestr=" + filestr); - Path filepath = new Path(filestr); - FSDataOutputStream stm = dfs.create(filepath, true, - bufferSize, repl, BLOCK_SIZE); - assertTrue(dfs.dfs.exists(filestr)); - - // write random number of bytes into it. int size = AppendTestUtil.nextInt(FILE_SIZE); - System.out.println("size=" + size); - stm.write(buffer, 0, size); - - // sync file - AppendTestUtil.LOG.info("sync"); - stm.sync(); - AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()"); - dfs.dfs.leasechecker.interruptAndJoin(); + Path filepath = createFile(dfs, size, true); // set the soft limit to be 1 second so that the // namenode triggers lease recovery on next attempt to write-for-open. cluster.setLeasePeriod(softLease, hardLease); - // try to re-open the file before closing the previous handle. This - // should fail but will trigger lease recovery. - { - Configuration conf2 = new Configuration(conf); - String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1"; - UnixUserGroupInformation.saveToConf(conf2, - UnixUserGroupInformation.UGI_PROPERTY_NAME, - new UnixUserGroupInformation(username, new String[]{"supergroup"})); - FileSystem dfs2 = FileSystem.get(conf2); - - boolean done = false; - for(int i = 0; i < 10 && !done; i++) { - AppendTestUtil.LOG.info("i=" + i); - try { - dfs2.create(filepath, false, bufferSize, repl, BLOCK_SIZE); - fail("Creation of an existing file should never succeed."); - } catch (IOException ioe) { - final String message = ioe.getMessage(); - if (message.contains("file exists")) { - AppendTestUtil.LOG.info("done", ioe); - done = true; - } - else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) { - AppendTestUtil.LOG.info("GOOD! got " + message); - } - else { - AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe); - } - } - - if (!done) { - AppendTestUtil.LOG.info("sleep " + 5000 + "ms"); - try {Thread.sleep(5000);} catch (InterruptedException e) {} - } - } - assertTrue(done); - } - - AppendTestUtil.LOG.info("Lease for file " + filepath + " is recovered. " - + "Validating its contents now..."); - - // verify that file-size matches - assertTrue("File should be " + size + " bytes, but is actually " + - " found to be " + dfs.getFileStatus(filepath).getLen() + - " bytes", - dfs.getFileStatus(filepath).getLen() == size); - - // verify that there is enough data to read. - System.out.println("File size is good. Now validating sizes from datanodes..."); - FSDataInputStream stmin = dfs.open(filepath); - stmin.readFully(0, actual, 0, size); - stmin.close(); + recoverLeaseUsingCreate(filepath); + verifyFile(dfs, filepath, actual, size); + + //test recoverLease + // set the soft limit to be 1 hour but recoverLease should + // close the file immediately + cluster.setLeasePeriod(hardLease, hardLease); + size = AppendTestUtil.nextInt(FILE_SIZE); + filepath = createFile(dfs, size, false); + + // test recoverLese from a different client + recoverLease(filepath, null); + verifyFile(dfs, filepath, actual, size); + + // test recoverlease from the same client + size = AppendTestUtil.nextInt(FILE_SIZE); + filepath = createFile(dfs, size, false); + + // create another file using the same client + Path filepath1 = new Path("/foo" + AppendTestUtil.nextInt()); + FSDataOutputStream stm = dfs.create(filepath1, true, + bufferSize, REPLICATION_NUM, BLOCK_SIZE); + + // recover the first file + recoverLease(filepath, dfs); + verifyFile(dfs, filepath, actual, size); + + // continue to write to the second file + stm.write(buffer, 0, size); + stm.close(); + verifyFile(dfs, filepath1, actual, size); } finally { try { - if (cluster != null) {cluster.shutdown();} + if (cluster != null) {cluster.getFileSystem().close();cluster.shutdown();} } catch (Exception e) { // ignore } } } + + private void recoverLease(Path filepath, DistributedFileSystem dfs2) throws Exception { + if (dfs2==null) { + Configuration conf2 = new Configuration(conf); + String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1"; + UnixUserGroupInformation.saveToConf(conf2, + UnixUserGroupInformation.UGI_PROPERTY_NAME, + new UnixUserGroupInformation(username, new String[]{"supergroup"})); + dfs2 = (DistributedFileSystem)FileSystem.get(conf2); + } + + while (!dfs2.recoverLease(filepath)) { + AppendTestUtil.LOG.info("sleep " + 5000 + "ms"); + Thread.sleep(5000); + } + } + + // try to re-open the file before closing the previous handle. This + // should fail but will trigger lease recovery. + private Path createFile(DistributedFileSystem dfs, int size, + boolean triggerSoftLease) throws IOException, InterruptedException { + // create a random file name + String filestr = "/foo" + AppendTestUtil.nextInt(); + System.out.println("filestr=" + filestr); + Path filepath = new Path(filestr); + FSDataOutputStream stm = dfs.create(filepath, true, + bufferSize, REPLICATION_NUM, BLOCK_SIZE); + assertTrue(dfs.dfs.exists(filestr)); + + // write random number of bytes into it. + System.out.println("size=" + size); + stm.write(buffer, 0, size); + + // sync file + AppendTestUtil.LOG.info("sync"); + stm.sync(); + if (triggerSoftLease) { + AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()"); + dfs.dfs.leasechecker.interruptAndJoin(); + } + return filepath; + } + + private void recoverLeaseUsingCreate(Path filepath) throws IOException { + Configuration conf2 = new Configuration(conf); + String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1"; + UnixUserGroupInformation.saveToConf(conf2, + UnixUserGroupInformation.UGI_PROPERTY_NAME, + new UnixUserGroupInformation(username, new String[]{"supergroup"})); + FileSystem dfs2 = FileSystem.get(conf2); + + boolean done = false; + for(int i = 0; i < 10 && !done; i++) { + AppendTestUtil.LOG.info("i=" + i); + try { + dfs2.create(filepath, false, bufferSize, (short)1, BLOCK_SIZE); + fail("Creation of an existing file should never succeed."); + } catch (IOException ioe) { + final String message = ioe.getMessage(); + if (message.contains("file exists")) { + AppendTestUtil.LOG.info("done", ioe); + done = true; + } + else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) { + AppendTestUtil.LOG.info("GOOD! got " + message); + } + else { + AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe); + } + } + + if (!done) { + AppendTestUtil.LOG.info("sleep " + 5000 + "ms"); + try {Thread.sleep(5000);} catch (InterruptedException e) {} + } + } + assertTrue(done); + + } + + private void verifyFile(FileSystem dfs, Path filepath, byte[] actual, + int size) throws IOException { + AppendTestUtil.LOG.info("Lease for file " + filepath + " is recovered. " + + "Validating its contents now..."); + + // verify that file-size matches + assertTrue("File should be " + size + " bytes, but is actually " + + " found to be " + dfs.getFileStatus(filepath).getLen() + + " bytes", + dfs.getFileStatus(filepath).getLen() == size); + + // verify that there is enough data to read. + System.out.println("File size is good. Now validating sizes from datanodes..."); + FSDataInputStream stmin = dfs.open(filepath); + stmin.readFully(0, actual, 0, size); + stmin.close(); + } } diff --git a/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 8f1d34e..2722e65 100644 --- a/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -27,7 +27,7 @@ public abstract class NameNodeAdapter { public static long callNextGenerationStampForBlock( FSNamesystem fsn, Block block) throws IOException { - return fsn.nextGenerationStampForBlock(block); + return fsn.nextGenerationStampForBlock(block, false); } } -- 1.6.0.4