diff --git a/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java b/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java index 6e29390..39d9497 100644 --- a/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java +++ b/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import java.io.EOFException; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -27,10 +28,14 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Level; @@ -47,11 +52,116 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase { private static byte[] buffer = new byte[FILE_SIZE]; private final Configuration conf = new Configuration(); private final int bufferSize = conf.getInt("io.file.buffer.size", 4096); - + static private String fakeUsername = "fakeUser1"; static private String fakeGroup = "supergroup"; - public void testBlockSynchronization() throws Exception { + /** + * Assert we don't lose edits when lease is taken from a writer + * @throws Exception + * @throws InterruptedException + */ + public void testNoLostEdits() throws Exception, InterruptedException { + // Same as for next test. + conf.setLong("dfs.block.size", BLOCK_SIZE); + conf.setInt("dfs.heartbeat.interval", 1); + Path p = new Path("/edits"); + EditWriter editWriter = null; + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster(conf, 5, true, null); + cluster.waitActive(); + // Start up a writer thread that adds edits to the 'p' file. + editWriter = new EditWriter(cluster.getFileSystem(), conf, p); + editWriter.start(); + while (editWriter.count <= 0) Thread.sleep(1); + System.out.println("Edits writer started"); + // Now recover the lease as another user + DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); + while (!dfs.recoverLease(p)) Thread.sleep(10); + Thread.sleep(1000); + // recoverLease(p, null, 1000); + System.out.println("Calling counts"); + int readCount = countKVs(cluster.getFileSystem(), p); + System.out.println("After calling counts"); + assertEquals(editWriter.count, readCount); + } finally { + System.out.println("Test DONE!"); + editWriter.stop = true; + try { + if (cluster != null) {cluster.getFileSystem().close();cluster.shutdown();} + } catch (Exception e) { + // ignore + } + } + } + + /** + * @param fs FileSystem + * @param wal File to count edits in. + * @return Count of {@link Text} key/values in wal + * @throws IOException + */ + private int countKVs(final FileSystem fs, final Path wal) throws IOException { + int count = 0; + SequenceFile.Reader reader = new SequenceFile.Reader(fs, wal, conf); + Text k = new Text(); + try { + while (reader.next(k)) { + k = new Text(); + count++; + } + } catch (EOFException e) { + // I can get one of these reading up to the end of the file whose lease I just stole. Just ignore. + } finally { + reader.close(); + } + return count; + } + + /** + * Writes edits to a file in a thread until asked to {@link #shutdown()} + * Call {@link #getCount()} to find how many 'edits' we wrote. + */ + private static class EditWriter extends Thread { + private volatile boolean stop; + private volatile int count = 0; + private SequenceFile.Writer writer; + + EditWriter(final FileSystem fs, final Configuration conf, final Path p) throws IOException { + this.writer = SequenceFile.createWriter(fs, conf, p, Text.class, Text.class); + } + + public void run() { + final Text value = new Text("value"); + try { + // Append edits, sync, and count + while (!this.stop) { + this.writer.append(new Text(String.format("k%07d", this.count)), value); + this.writer.syncFs(); + this.count++; + } + } catch (RemoteException e) { + if (e.unwrapRemoteException() instanceof LeaseExpiredException) { + // This is what I'd expect to get when the lease is taken from me... so just exit nicely. + // Set writer to null because we won't be able to close it anyways now we have lost the lease + this.writer = null; + } else { + throw new RuntimeException("Unexpected remote exception", e); + } + } catch (IOException e) { + throw new RuntimeException("Unexpected", e); + } finally { + try { + if (this.writer != null) this.writer.close(); + } catch (IOException e) { + throw new RuntimeException("Unexpected on close", e); + } + } + } + } + + public void igore_testBlockSynchronization() throws Exception { final long softLease = 1000; final long hardLease = 60 * 60 *1000; conf.setLong("dfs.block.size", BLOCK_SIZE); @@ -98,12 +208,12 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase { // test recoverlease from the same client size = AppendTestUtil.nextInt(FILE_SIZE); filepath = createFile(dfs, size, false); - + // create another file using the same client Path filepath1 = new Path("/foo" + AppendTestUtil.nextInt()); FSDataOutputStream stm = dfs.create(filepath1, true, bufferSize, REPLICATION_NUM, BLOCK_SIZE); - + // recover the first file recoverLease(filepath, dfs); verifyFile(dfs, filepath, actual, size); @@ -121,21 +231,27 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase { } } } - - private void recoverLease(Path filepath, DistributedFileSystem dfs2) throws Exception { + + private void recoverLease(Path filepath, DistributedFileSystem dfs) + throws Exception { + recoverLease(filepath, dfs, 5000); + } + + private void recoverLease(Path filepath, DistributedFileSystem dfs2, final int sleepTimeInMillis) + throws Exception { if (dfs2==null) { - UserGroupInformation ugi = - UserGroupInformation.createUserForTesting(fakeUsername, + UserGroupInformation ugi = + UserGroupInformation.createUserForTesting(fakeUsername, new String [] { fakeGroup}); dfs2 = (DistributedFileSystem) DFSTestUtil.getFileSystemAs(ugi, conf); } - + while (!dfs2.recoverLease(filepath)) { - AppendTestUtil.LOG.info("sleep " + 5000 + "ms"); - Thread.sleep(5000); + AppendTestUtil.LOG.info("sleep " + sleepTimeInMillis + "ms"); + Thread.sleep(sleepTimeInMillis); } } - + // try to re-open the file before closing the previous handle. This // should fail but will trigger lease recovery. private Path createFile(DistributedFileSystem dfs, int size, @@ -161,11 +277,11 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase { } return filepath; } - + private void recoverLeaseUsingCreate(Path filepath) throws IOException, InterruptedException { - UserGroupInformation ugi = - UserGroupInformation.createUserForTesting(fakeUsername, + UserGroupInformation ugi = + UserGroupInformation.createUserForTesting(fakeUsername, new String [] { fakeGroup}); FileSystem dfs2 = DFSTestUtil.getFileSystemAs(ugi, conf); @@ -197,7 +313,7 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase { assertTrue(done); } - + private void verifyFile(FileSystem dfs, Path filepath, byte[] actual, int size) throws IOException { AppendTestUtil.LOG.info("Lease for file " + filepath + " is recovered. "