diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java index 93b8794..6747abe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.EOFException; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -36,6 +37,8 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -44,9 +47,11 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Level; import org.junit.AfterClass; @@ -106,6 +111,94 @@ public static void tearDown() throws IOException { } /** + * Assert we don't lose edits when lease is taken from a writer + * @throws Exception + * @throws InterruptedException + */ + @Test + public void testNoLostEdits() throws Exception, InterruptedException { + Path p = new Path("/edits"); + // Start up a writer thread that adds edits to the 'p' file. + EditWriter editWriter = new EditWriter(p); + try { + editWriter.start(); + while (editWriter.count <= 0) Thread.sleep(1); + // Now recover the lease as another user + recoverLease(p, null, 1000); + int readCount = countKVs(p); + assertEquals(readCount, editWriter.count); + } finally { + editWriter.stop = true; + } + } + + /** + * @param wal File to count edits in. + * @return Count of {@link Text} key/values in wal + * @throws IOException + */ + private int countKVs(final Path wal) throws IOException { + Text k = new Text(); + Text v = new Text(); + int count = 0; + SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(wal)); + try { + while (reader.next(k, v)) { + count++; + k = new Text(); + v = new Text(); + } + } catch (EOFException e) { + // I can get one of these reading up to the end of the file whose lease I just stole. Just ignore. + } finally { + reader.close(); + } + return count; + } + + /** + * Writes edits to a file in a thread until asked to {@link #shutdown()} + * Call {@link #getCount()} to find how many 'edits' we wrote. + */ + private static class EditWriter extends Thread { + private volatile boolean stop; + private volatile int count; + private SequenceFile.Writer writer; + + EditWriter(final Path p) throws IOException { + this.writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(p), + SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)); + } + + public void run() { + final Text value = new Text("value"); + try { + // Append edits, sync, and count + for (this.count = 0; !this.stop; this.count++) { + this.writer.append(new Text(String.format("k%07d", this.count)), value); + this.writer.hsync(); + } + } catch (RemoteException e) { + if (e.unwrapRemoteException() instanceof LeaseExpiredException) { + // This is what I'd expect to get when the lease is taken from me... so just exit nicely. + // Set writer to null because we won't be able to close it anyways now we have lost the lease + this.writer = null; + } else { + throw new RuntimeException("Unexpected remote exception", e); + } + } catch (IOException e) { + throw new RuntimeException("Unexpected", e); + } finally { + try { + if (this.writer != null) this.writer.close(); + } catch (IOException e) { + throw new RuntimeException("Unexpected on close", e); + } + } + } + } + + /** * Test the NameNode's revoke lease on current lease holder function. * @throws Exception */ @@ -176,16 +269,22 @@ private Path createFile(final String filestr, final int size, return filepath; } - private void recoverLease(Path filepath, DistributedFileSystem dfs) + private DistributedFileSystem recoverLease(Path filepath, DistributedFileSystem dfs) + throws Exception { + return recoverLease(filepath, dfs, 5000); + } + + private DistributedFileSystem recoverLease(Path filepath, DistributedFileSystem dfs, final int sleepTimeInMillis) throws Exception { if (dfs == null) { dfs = (DistributedFileSystem)getFSAsAnotherUser(conf); } while (!dfs.recoverLease(filepath)) { - AppendTestUtil.LOG.info("sleep " + 5000 + "ms"); - Thread.sleep(5000); + AppendTestUtil.LOG.info("sleep " + sleepTimeInMillis + "ms"); + Thread.sleep(sleepTimeInMillis); } + return dfs; } private FileSystem getFSAsAnotherUser(final Configuration c)