Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1386481) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -109,7 +109,7 @@ ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true")); - replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); + replication = new Replication(new DummyServer(), fs, logDir, oldLogDir, null); manager = replication.getReplicationManager(); fs = FileSystem.get(conf); oldLogDir = new Path(utility.getDataTestDir(), @@ -195,7 +195,7 @@ hlog.rollWriter(); manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), - "1", 0, false); + "1", 0, false, false); HLogKey key = new HLogKey(hri.getRegionName(), test, seq++, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); Index: src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (revision 1386481) +++ src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (working copy) @@ -74,7 +74,7 @@ public void stop(String why) {} @Override public boolean isStopped() {return false;} - }, FileSystem.get(conf), replicating, logDir, oldLogDir); + }, FileSystem.get(conf), replicating, logDir, oldLogDir,null); } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1386481) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -196,6 +196,7 @@ // The timestamp (in ms) when the log file was created. private volatile long filenum = -1; + private volatile String currentFilePath; //number of transactions in the current Hlog. private final AtomicInteger numEntries = new AtomicInteger(0); @@ -585,12 +586,13 @@ byte [][] regionsToFlush = null; this.cacheFlushLock.lock(); this.logRollRunning = true; + Path newPath = null; try { // Do all the preparation outside of the updateLock to block // as less as possible the incoming writes long currentFilenum = this.filenum; this.filenum = System.currentTimeMillis(); - Path newPath = computeFilename(); + newPath = computeFilename(); HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); // Can we get at the dfsclient outputstream? If an instance of // SFLW, it'll have done the necessary reflection to get at the @@ -642,6 +644,7 @@ } } finally { this.logRollRunning = false; + this.currentFilePath = newPath.toUri().getRawPath(); this.cacheFlushLock.unlock(); } return regionsToFlush; @@ -1679,6 +1682,14 @@ return lastDeferredSeq; } + /** Is the file currently open for writing by HLog. If the filename + * doesn't match with the currentFilePath field value, then it is + * assumed that the HLog instance has closed/aborted the file. + */ + public boolean isFileInUse(String filename) { + return currentFilePath.equals(filename); + } + /** * Pass one or more log file names and it will either dump out a text version * on stdout or split the specified log files. Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1386481) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1201,7 +1201,7 @@ // log directories. try { this.replicationHandler = Replication.isReplication(this.conf)? - new Replication(this, this.fs, logdir, oldLogDir): null; + new Replication(this, this.fs, logdir, oldLogDir, this): null; } catch (KeeperException e) { throw new IOException("Failed replication handler create", e); } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1386481) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -87,6 +88,8 @@ private final long sleepBeforeFailover; // Homemade executer service for replication private final ThreadPoolExecutor executor; + // the HRegionServer instance + private final HRegionServer hServer; /** * Creates a replication manager and sets the watch on all the other @@ -98,6 +101,7 @@ * @param replicating the status of the replication on this cluster * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived + * @param hServer the instance of HRegionServer */ public ReplicationSourceManager(final ReplicationZookeeper zkHelper, final Configuration conf, @@ -105,7 +109,8 @@ final FileSystem fs, final AtomicBoolean replicating, final Path logDir, - final Path oldLogDir) { + final Path oldLogDir, + final HRegionServer hServer) { this.sources = new ArrayList(); this.replicating = replicating; this.zkHelper = zkHelper; @@ -133,6 +138,7 @@ ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); this.executor.setThreadFactory(tfb.build()); + this.hServer = hServer; } /** @@ -144,11 +150,16 @@ * @param id id of the peer cluster * @param position current location in the log * @param queueRecovered indicates if this queue comes from another region server + * @param fileNotInUse is the file currently in use */ - public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) { + public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, + boolean fileNotInUse) { String key = log.getName(); LOG.info("Going to report log #" + key + " for position " + position + " in " + log); this.zkHelper.writeReplicationStatus(key, id, position); + if (!fileNotInUse) { + return; + } synchronized (this.hlogsById) { SortedSet hlogs = this.hlogsById.get(id); if (!queueRecovered && hlogs.first() != key) { @@ -252,6 +263,10 @@ return this.sources; } + public HRegionServer getRegionServerInstance() { + return hServer; + } + void logRolled(Path newLog) throws IOException { if (!this.replicating.get()) { LOG.warn("Replication stopped, won't add new log"); Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1386481) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -67,7 +68,7 @@ * @throws KeeperException */ public Replication(final Server server, final FileSystem fs, - final Path logDir, final Path oldLogDir) + final Path logDir, final Path oldLogDir, final HRegionServer hServer) throws IOException, KeeperException { this.server = server; this.conf = this.server.getConfiguration(); @@ -75,7 +76,7 @@ if (replication) { this.zkHelper = new ReplicationZookeeper(server, this.replicating); this.replicationManager = new ReplicationSourceManager(zkHelper, conf, - this.server, fs, this.replicating, logDir, oldLogDir) ; + this.server, fs, this.replicating, logDir, oldLogDir, hServer) ; } else { this.replicationManager = null; this.zkHelper = null; Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1386481) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -289,6 +289,13 @@ } continue; } + boolean fileNotInUse = true; + //take a snapshot of whether the file is in use (for writing) NOW + if (manager.getRegionServerInstance().getWAL() + .isFileInUse(getCurrentPath().toUri().getRawPath())) { + LOG.info("FILE " + getCurrentPath() + " in use"); + fileNotInUse = false; + } // Open a reader on it if (!openReader(sleepMultiplier)) { // Reset the sleep multiplier, else it'd be reused for the next file @@ -360,7 +367,7 @@ if (this.isActive() && (gotIOE || currentNbEntries == 0)) { if (this.lastLoggedPosition != this.position) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); + this.peerClusterZnode, this.position, queueRecovered, fileNotInUse); this.lastLoggedPosition = this.position; } if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { @@ -369,7 +376,7 @@ continue; } sleepMultiplier = 1; - shipEdits(); + shipEdits(fileNotInUse); } if (this.conn != null) { @@ -385,11 +392,13 @@ /** * Read all the entries from the current log files and retain those * that need to be replicated. Else, process the end of the current file. + * @param fileNotInUse is the file in use * @return true if we got nothing and went to the next file, false if we got * entries * @throws IOException */ - protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{ + protected boolean readAllEntriesToReplicateOrNextFile(boolean fileNotInUse) + throws IOException{ long seenEntries = 0; if (this.position != 0) { this.reader.seek(this.position); @@ -438,6 +447,9 @@ LOG.debug("currentNbOperations:" + currentNbOperations + " and seenEntries:" + seenEntries + " and size: " + (this.reader.getPosition() - this.position)); + if (!fileNotInUse) { + return false; + } // If we didn't get anything and the queue has an object, it means we // hit the end of the file for sure return seenEntries == 0 && processEndOfFile(); @@ -601,7 +613,7 @@ /** * Do the shipping logic */ - protected void shipEdits() { + protected void shipEdits(boolean fileNotInUse) { int sleepMultiplier = 1; if (this.currentNbEntries == 0) { LOG.warn("Was given 0 edits to ship"); @@ -614,7 +626,7 @@ rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries)); if (this.lastLoggedPosition != this.position) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); + this.peerClusterZnode, this.position, queueRecovered, fileNotInUse); this.lastLoggedPosition = this.position; } this.totalReplicatedEdits += currentNbEntries;