diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 7569992..528bccb 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; @@ -146,9 +145,6 @@ public class HLog implements Syncable { private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas final static Object [] NO_ARGS = new Object []{}; - // used to indirectly tell syncFs to force the sync - private boolean forceSync = false; - public interface Reader { void init(FileSystem fs, Path path, Configuration c) throws IOException; void close() throws IOException; @@ -494,7 +490,7 @@ public class HLog implements Syncable { OutputStream nextHdfsOut = null; if (nextWriter instanceof SequenceFileLogWriter) { nextHdfsOut = - ((SequenceFileLogWriter)nextWriter).getDFSCOutputStream(); + ((SequenceFileLogWriter)nextWriter).getDFSClientOutputStream(); } // Tell our listeners that a new log was created if (!this.listeners.isEmpty()) { @@ -950,8 +946,6 @@ public class HLog implements Syncable { private final long optionalFlushInterval; - private boolean syncerShuttingDown = false; - LogSyncer(long optionalFlushInterval) { this.optionalFlushInterval = optionalFlushInterval; } @@ -972,7 +966,6 @@ public class HLog implements Syncable { } catch (InterruptedException e) { LOG.debug(getName() + " interrupted while waiting for sync requests"); } finally { - syncerShuttingDown = true; LOG.info(getName() + " exiting"); } } @@ -998,7 +991,6 @@ public class HLog implements Syncable { } } } - } catch (IOException e) { LOG.fatal("Could not append. Requesting close of hlog", e); requestLogRoll(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 8dc9a5e..ce897d2 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -45,11 +45,12 @@ public class SequenceFileLogWriter implements HLog.Writer { private SequenceFile.Writer writer; // The dfsclient out stream gotten made accessible or null if not available. private OutputStream dfsClient_out; - // The syncFs method from hdfs-200 or null if not available. - private Method syncFs; private Class keyClass; + private Method syncFs = null; + private Method hflush = null; + /** * Default constructor. */ @@ -69,7 +70,7 @@ public class SequenceFileLogWriter implements HLog.Writer { @Override public void init(FileSystem fs, Path path, Configuration conf) - throws IOException { + throws IOException { if (null == keyClass) { keyClass = HLog.getKeyClass(conf); @@ -87,10 +88,62 @@ public class SequenceFileLogWriter implements HLog.Writer { new DefaultCodec(), null, new Metadata()); + + makeSequenceFilePrivateFSDataOutputStreamAccessible(); + this.syncFs = getSyncFs(); + this.hflush = getHFlush(); + } - // Get at the private FSDataOutputStream inside in SequenceFile so we can - // call sync on it. Make it accessible. Stash it aside for call up in - // the sync method. + /** + * Now do dirty work to see if syncFs is available on the backing this.writer. + * It will be available in branch-0.20-append and in CDH3. + * @return The syncFs method or null if not available. + * @throws IOException + */ + private Method getSyncFs() + throws IOException { + Method m = null; + try { + // function pointer to writer.syncFs() + m = this.writer.getClass().getMethod("syncFs", new Class []{}); + } catch (SecurityException e) { + throw new IOException("Failed test for syncfs", e); + } catch (NoSuchMethodException e) { + // Not available + } + if (LOG.isDebugEnabled()) { + LOG.debug((m != null)? "Using syncFs -- HDFS-200": + "syncFs -- HDFS-200 -- not available"); + } + return m; + } + + /** + * See if hflush (0.21 and 0.22 hadoop) is available. + * @return The hflush method or null if not available. + * @throws IOException + */ + private Method getHFlush() + throws IOException { + Method m = null; + try { + m = getDFSClientOutputStream().getClass().getMethod("hflush", new Class []{}); + } catch (SecurityException e) { + throw new IOException("Failed test for hflush", e); + } catch (NoSuchMethodException e) { + // Not available + } + if (LOG.isDebugEnabled()) { + LOG.debug((m != null)? "Using hflush": + "hflush not available"); + } + return m; + } + + // Get at the private FSDataOutputStream inside in SequenceFile so we can + // call sync on it. Make it accessible. + private void makeSequenceFilePrivateFSDataOutputStreamAccessible() + throws IOException { final Field fields [] = this.writer.getClass().getDeclaredFields(); final String fieldName = "out"; for (int i = 0; i < fields.length; ++i) { @@ -107,25 +160,6 @@ public class SequenceFileLogWriter implements HLog.Writer { } } } - - // Now do dirty work to see if syncFs is available. - // Test if syncfs is available. - Method m = null; - boolean append = conf.getBoolean("dfs.support.append", false); - if (append) { - try { - // function pointer to writer.syncFs() - m = this.writer.getClass().getMethod("syncFs", new Class []{}); - } catch (SecurityException e) { - throw new IOException("Failed test for syncfs", e); - } catch (NoSuchMethodException e) { - // Not available - } - } - this.syncFs = m; - LOG.info((this.syncFs != null)? - "Using syncFs -- HDFS-200": - ("syncFs -- HDFS-200 -- not available, dfs.support.append=" + append)); } @Override @@ -146,6 +180,12 @@ public class SequenceFileLogWriter implements HLog.Writer { } catch (Exception e) { throw new IOException("Reflection", e); } + } else if (this.hflush != null) { + try { + this.hflush.invoke(getDFSClientOutputStream(), HLog.NO_ARGS); + } catch (Exception e) { + throw new IOException("Reflection", e); + } } } @@ -158,7 +198,7 @@ public class SequenceFileLogWriter implements HLog.Writer { * @return The dfsclient out stream up inside SF.Writer made accessible, or * null if not available. */ - public OutputStream getDFSCOutputStream() { + public OutputStream getDFSClientOutputStream() { return this.dfsClient_out; } -} +} \ No newline at end of file