Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (revision 1341138) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (working copy) @@ -330,123 +330,6 @@ } } - // For this test to pass, requires: - // 1. HDFS-200 (append support) - // 2. HDFS-988 (SafeMode should freeze file operations - // [FSNamesystem.nextGenerationStampForBlock]) - // 3. HDFS-142 (on restart, maintain pendingCreates) - @Test - public void testAppendClose() throws Exception { - byte [] tableName = Bytes.toBytes(getName()); - HRegionInfo regioninfo = new HRegionInfo(tableName, - HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false); - Path subdir = new Path(dir, "hlogdir"); - Path archdir = new Path(dir, "hlogdir_archive"); - HLog wal = new HLog(fs, subdir, archdir, conf); - final int total = 20; - HTableDescriptor htd = new HTableDescriptor(); - htd.addFamily(new HColumnDescriptor(tableName)); - - for (int i = 0; i < total; i++) { - WALEdit kvs = new WALEdit(); - kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName)); - wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd); - } - // Now call sync to send the data to HDFS datanodes - wal.sync(); - int namenodePort = cluster.getNameNodePort(); - final Path walPath = wal.computeFilename(); - - - // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster) - try { - DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem(); - dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_ENTER); - cluster.shutdown(); - try { - // wal.writer.close() will throw an exception, - // but still call this since it closes the LogSyncer thread first - wal.close(); - } catch (IOException e) { - LOG.info(e); - } - fs.close(); // closing FS last so DFSOutputStream can't call close - LOG.info("STOPPED first instance of the cluster"); - } finally { - // Restart the cluster - while (cluster.isClusterUp()){ - LOG.error("Waiting for cluster to go down"); - Thread.sleep(1000); - } - - // Workaround a strange issue with Hadoop's RPC system - if we don't - // sleep here, the new datanodes will pick up a cached IPC connection to - // the old (dead) NN and fail to start. Sleeping 2 seconds goes past - // the idle time threshold configured in the conf above - Thread.sleep(2000); - - cluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); - cluster.waitActive(); - fs = cluster.getFileSystem(); - LOG.info("START second instance."); - } - - // set the lease period to be 1 second so that the - // namenode triggers lease recovery upon append request - Method setLeasePeriod = cluster.getClass() - .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE}); - setLeasePeriod.setAccessible(true); - setLeasePeriod.invoke(cluster, - new Object[]{new Long(1000), new Long(1000)}); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - LOG.info(e); - } - - // Now try recovering the log, like the HMaster would do - final FileSystem recoveredFs = fs; - final Configuration rlConf = conf; - - class RecoverLogThread extends Thread { - public Exception exception = null; - public void run() { - try { - FSUtils.getInstance(fs, rlConf) - .recoverFileLease(recoveredFs, walPath, rlConf); - } catch (IOException e) { - exception = e; - } - } - } - - RecoverLogThread t = new RecoverLogThread(); - t.start(); - // Timeout after 60 sec. Without correct patches, would be an infinite loop - t.join(60 * 1000); - if(t.isAlive()) { - t.interrupt(); - throw new Exception("Timed out waiting for HLog.recoverLog()"); - } - - if (t.exception != null) - throw t.exception; - - // Make sure you can read all the content - SequenceFile.Reader reader - = new SequenceFile.Reader(this.fs, walPath, this.conf); - int count = 0; - HLogKey key = HLog.newKey(conf); - WALEdit val = new WALEdit(); - while (reader.next(key, val)) { - count++; - assertTrue("Should be one KeyValue per WALEdit", - val.getKeyValues().size() == 1); - } - assertEquals(total, count); - reader.close(); - } - /** * Tests that we can write out an edit, close, and then read it back in again. * @throws IOException @@ -681,7 +564,124 @@ Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName()); assertNotNull(c); } + // For this test to pass, requires: + // 1. HDFS-200 (append support) + // 2. HDFS-988 (SafeMode should freeze file operations + // [FSNamesystem.nextGenerationStampForBlock]) + // 3. HDFS-142 (on restart, maintain pendingCreates) + @Test + public void testAppendClose() throws Exception { + byte [] tableName = Bytes.toBytes(getName()); + HRegionInfo regioninfo = new HRegionInfo(tableName, + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false); + Path subdir = new Path(dir, "hlogdir"); + Path archdir = new Path(dir, "hlogdir_archive"); + HLog wal = new HLog(fs, subdir, archdir, conf); + final int total = 20; + HTableDescriptor htd = new HTableDescriptor(); + htd.addFamily(new HColumnDescriptor(tableName)); + + for (int i = 0; i < total; i++) { + WALEdit kvs = new WALEdit(); + kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName)); + wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd); + } + // Now call sync to send the data to HDFS datanodes + wal.sync(); + int namenodePort = cluster.getNameNodePort(); + final Path walPath = wal.computeFilename(); + + + // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster) + try { + DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem(); + dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_ENTER); + cluster.shutdown(); + try { + // wal.writer.close() will throw an exception, + // but still call this since it closes the LogSyncer thread first + wal.close(); + } catch (IOException e) { + LOG.info(e); + } + fs.close(); // closing FS last so DFSOutputStream can't call close + LOG.info("STOPPED first instance of the cluster"); + } finally { + // Restart the cluster + while (cluster.isClusterUp()){ + LOG.error("Waiting for cluster to go down"); + Thread.sleep(1000); + } + + // Workaround a strange issue with Hadoop's RPC system - if we don't + // sleep here, the new datanodes will pick up a cached IPC connection to + // the old (dead) NN and fail to start. Sleeping 2 seconds goes past + // the idle time threshold configured in the conf above + Thread.sleep(2000); + + cluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); + cluster.waitActive(); + fs = cluster.getFileSystem(); + LOG.info("START second instance."); + } + + // set the lease period to be 1 second so that the + // namenode triggers lease recovery upon append request + Method setLeasePeriod = cluster.getClass() + .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE}); + setLeasePeriod.setAccessible(true); + setLeasePeriod.invoke(cluster, + new Object[]{new Long(1000), new Long(1000)}); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.info(e); + } + + // Now try recovering the log, like the HMaster would do + final FileSystem recoveredFs = fs; + final Configuration rlConf = conf; + + class RecoverLogThread extends Thread { + public Exception exception = null; + public void run() { + try { + FSUtils.getInstance(fs, rlConf) + .recoverFileLease(recoveredFs, walPath, rlConf); + } catch (IOException e) { + exception = e; + } + } + } + + RecoverLogThread t = new RecoverLogThread(); + t.start(); + // Timeout after 60 sec. Without correct patches, would be an infinite loop + t.join(60 * 1000); + if(t.isAlive()) { + t.interrupt(); + throw new Exception("Timed out waiting for HLog.recoverLog()"); + } + + if (t.exception != null) + throw t.exception; + + // Make sure you can read all the content + SequenceFile.Reader reader + = new SequenceFile.Reader(this.fs, walPath, this.conf); + int count = 0; + HLogKey key = HLog.newKey(conf); + WALEdit val = new WALEdit(); + while (reader.next(key, val)) { + count++; + assertTrue("Should be one KeyValue per WALEdit", + val.getKeyValues().size() == 1); + } + assertEquals(total, count); + reader.close(); + } + private void addEdits(HLog log, HRegionInfo hri, byte [] tableName, int times) throws IOException { HTableDescriptor htd = new HTableDescriptor();