Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (revision 988213) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (working copy) @@ -334,7 +334,7 @@ wal.append(regioninfo, tableName, kvs, System.currentTimeMillis()); } // Now call sync to send the data to HDFS datanodes - wal.sync(true); + wal.sync(); int namenodePort = cluster.getNameNodePort(); final Path walPath = wal.computeFilename(); Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 988213) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -141,10 +141,10 @@ private final long blocksize; private final int flushlogentries; private final String prefix; - private final AtomicInteger unflushedEntries = new AtomicInteger(0); private final Path oldLogDir; private final List actionListeners = Collections.synchronizedList(new ArrayList()); + private boolean logRollRequested; private static Class logWriterClass; @@ -212,6 +212,7 @@ // We synchronize on updateLock to prevent updates and to prevent a log roll // during an update + // locked during appends private final Object updateLock = new Object(); private final boolean enabled; @@ -224,7 +225,7 @@ private final int maxLogs; /** - * Thread that handles group commit + * Thread that handles optional sync'ing */ private final LogSyncer logSyncerThread; @@ -473,6 +474,7 @@ this.fs.getFileStatus(oldFile).getLen() + ". ": "") + "New hlog " + FSUtils.getPath(newPath)); this.numEntries.set(0); + this.logRollRequested = false; } // Tell our listeners that a new log was created if (!this.actionListeners.isEmpty()) { @@ -798,12 +800,11 @@ // is greater than or equal to the value in lastSeqWritten. this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum)); doWrite(regionInfo, logKey, logEdit); - this.unflushedEntries.incrementAndGet(); this.numEntries.incrementAndGet(); } // sync txn to file system - this.sync(regionInfo.isMetaRegion()); + this.sync(); } /** @@ -849,12 +850,13 @@ HLogKey logKey = makeKey(regionName, tableName, seqNum, now); doWrite(info, logKey, edits); this.numEntries.incrementAndGet(); - - // Only count 1 row as an unflushed entry. - this.unflushedEntries.incrementAndGet(); } - // sync txn to file system - this.sync(info.isMetaRegion()); + // Sync if catalog region, and if not then check if that table supports + // deferred log flushing + if (info.isMetaRegion() || !info.getTableDesc().isDeferredLogFlush()) { + // sync txn to file system + this.sync(); + } } /** @@ -863,15 +865,6 @@ */ class LogSyncer extends Thread { - // Using fairness to make sure locks are given in order - private final ReentrantLock lock = new ReentrantLock(true); - - // Condition used to wait until we have something to sync - private final Condition queueEmpty = lock.newCondition(); - - // Condition used to signal that the sync is done - private final Condition syncDone = lock.newCondition(); - private final long optionalFlushInterval; private boolean syncerShuttingDown = false; @@ -883,28 +876,12 @@ @Override public void run() { try { - lock.lock(); // awaiting with a timeout doesn't always // throw exceptions on interrupt while(!this.isInterrupted()) { - // Wait until something has to be hflushed or do it if we waited - // enough time (useful if something appends but does not hflush). - // 0 or less means that it timed out and maybe waited a bit more. - if (!(queueEmpty.awaitNanos( - this.optionalFlushInterval*1000000) <= 0)) { - forceSync = true; - } - - // We got the signal, let's hflush. We currently own the lock so new - // writes are waiting to acquire it in addToSyncQueue while the ones - // we hflush are waiting on await() - hflush(); - - // Release all the clients waiting on the hflush. Notice that we still - // own the lock until we get back to await at which point all the - // other threads waiting will first acquire and release locks - syncDone.signalAll(); + Thread.sleep(this.optionalFlushInterval); + sync(); } } catch (IOException e) { LOG.error("Error while syncing, requesting close of hlog ", e); @@ -913,104 +890,60 @@ LOG.debug(getName() + "interrupted while waiting for sync requests"); } finally { syncerShuttingDown = true; - syncDone.signalAll(); - lock.unlock(); LOG.info(getName() + " exiting"); } } - - /** - * This method first signals the thread that there's a sync needed - * and then waits for it to happen before returning. - */ - public void addToSyncQueue(boolean force) { - - // Don't bother if somehow our append was already hflushed - if (unflushedEntries.get() == 0) { - return; - } - lock.lock(); - try { - if (syncerShuttingDown) { - LOG.warn(getName() + " was shut down while waiting for sync"); - return; - } - if(force) { - forceSync = true; - } - // Wake the thread - queueEmpty.signal(); - - // Wait for it to hflush - syncDone.await(); - } catch (InterruptedException e) { - LOG.debug(getName() + " was interrupted while waiting for sync", e); - } - finally { - lock.unlock(); - } - } } - public void sync(){ - sync(false); - } - - /** - * This method calls the LogSyncer in order to group commit the sync - * with other threads. - * @param force For catalog regions, force the sync to happen - */ - public void sync(boolean force) { - logSyncerThread.addToSyncQueue(force); - } - - public void hflush() throws IOException { + public void sync() throws IOException { synchronized (this.updateLock) { if (this.closed) { return; } - boolean logRollRequested = false; - if (this.forceSync || - this.unflushedEntries.get() >= this.flushlogentries) { - try { - long now = System.currentTimeMillis(); - this.writer.sync(); - syncTime += System.currentTimeMillis() - now; - syncOps++; - this.forceSync = false; - this.unflushedEntries.set(0); - - // if the number of replicas in HDFS has fallen below the initial - // value, then roll logs. - try { - int numCurrentReplicas = getLogReplication(); - if (numCurrentReplicas != 0 && - numCurrentReplicas < this.initialReplication) { - LOG.warn("HDFS pipeline error detected. " + - "Found " + numCurrentReplicas + " replicas but expecting " + - this.initialReplication + " replicas. " + - " Requesting close of hlog."); - requestLogRoll(); - logRollRequested = true; - } - } catch (Exception e) { - LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + - " still proceeding ahead..."); + } + try { + long now = System.currentTimeMillis(); + // Done in parallel for all writer threads, thanks to HDFS-895 + this.writer.sync(); + synchronized (this.updateLock) { + syncTime += System.currentTimeMillis() - now; + syncOps++; + if (!logRollRequested) { + checkLowReplication(); + if (this.writer.getLength() > this.logrollsize) { + requestLogRoll(); } - } catch (IOException e) { - LOG.fatal("Could not append. Requesting close of hlog", e); - requestLogRoll(); - throw e; } } - if (!logRollRequested && (this.writer.getLength() > this.logrollsize)) { + } catch (IOException e) { + LOG.fatal("Could not append. Requesting close of hlog", e); + requestLogRoll(); + throw e; + } + } + + private void checkLowReplication() { + // if the number of replicas in HDFS has fallen below the initial + // value, then roll logs. + try { + int numCurrentReplicas = getLogReplication(); + if (numCurrentReplicas != 0 && + numCurrentReplicas < this.initialReplication) { + LOG.warn("HDFS pipeline error detected. " + + "Found " + numCurrentReplicas + " replicas but expecting " + + this.initialReplication + " replicas. " + + " Requesting close of hlog."); requestLogRoll(); + logRollRequested = true; } + } catch (Exception e) { + LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + + " still proceeding ahead..."); } } + /** * This method gets the datanode replication count for the current HLog. * @@ -1039,7 +972,7 @@ public void hsync() throws IOException { // Not yet implemented up in hdfs so just call hflush. - hflush(); + sync(); } private void requestLogRoll() { @@ -1143,7 +1076,7 @@ } } // sync txn to file system - this.sync(isMetaRegion); + this.sync(); } finally { this.cacheFlushLock.unlock(); Index: src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java (revision 988213) +++ src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java (working copy) @@ -95,7 +95,7 @@ public static final long DEFAULT_MAX_FILESIZE = 1024*1024*256L; - public static final boolean DEFAULT_DEFERRED_LOG_FLUSH = true; + public static final boolean DEFAULT_DEFERRED_LOG_FLUSH = false; private volatile Boolean meta = null; private volatile Boolean root = null;