Index: src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (revision 1386889) +++ src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (working copy) @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; @@ -40,8 +41,8 @@ @Override public void init(Configuration conf, FileSystem fs, - ReplicationSourceManager manager, Stoppable stopper, - AtomicBoolean replicating, String peerClusterId) + ReplicationSourceManager manager, RegionServerServices services, + Stoppable stopper, AtomicBoolean replicating, String peerClusterId) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1386889) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -109,7 +109,7 @@ ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true")); - replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); + replication = new Replication(new DummyServer(), fs, logDir, oldLogDir, null); manager = replication.getReplicationManager(); fs = FileSystem.get(conf); oldLogDir = new Path(utility.getDataTestDir(), @@ -195,7 +195,7 @@ hlog.rollWriter(); manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), - "1", 0, false); + "1", 0, false, false); HLogKey key = new HLogKey(hri.getRegionName(), test, seq++, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); Index: src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (revision 1386889) +++ src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (working copy) @@ -74,7 +74,7 @@ public void stop(String why) {} @Override public boolean isStopped() {return false;} - }, FileSystem.get(conf), replicating, logDir, oldLogDir); + }, FileSystem.get(conf), replicating, logDir, oldLogDir, null); } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1386889) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -196,6 +196,7 @@ // The timestamp (in ms) when the log file was created. private volatile long filenum = -1; + private volatile String currentFilePath; //number of transactions in the current Hlog. private final AtomicInteger numEntries = new AtomicInteger(0); @@ -585,12 +586,13 @@ byte [][] regionsToFlush = null; this.cacheFlushLock.lock(); this.logRollRunning = true; + Path newPath = null; try { // Do all the preparation outside of the updateLock to block // as less as possible the incoming writes long currentFilenum = this.filenum; this.filenum = System.currentTimeMillis(); - Path newPath = computeFilename(); + newPath = computeFilename(); HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); // Can we get at the dfsclient outputstream? If an instance of // SFLW, it'll have done the necessary reflection to get at the @@ -642,6 +644,7 @@ } } finally { this.logRollRunning = false; + this.currentFilePath = newPath.toUri().getRawPath(); this.cacheFlushLock.unlock(); } return regionsToFlush; @@ -1679,6 +1682,14 @@ return lastDeferredSeq; } + /** Is the file currently open for writing by HLog. If the filename + * doesn't match with the currentFilePath field value, then it is + * assumed that the HLog instance has closed/aborted the file. + */ + public boolean isFileInUse(String filename) { + return currentFilePath.equals(filename); + } + /** * Pass one or more log file names and it will either dump out a text version * on stdout or split the specified log files. Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1386889) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1201,7 +1201,7 @@ // log directories. try { this.replicationHandler = Replication.isReplication(this.conf)? - new Replication(this, this.fs, logdir, oldLogDir): null; + new Replication(this, this.fs, logdir, oldLogDir, this): null; } catch (KeeperException e) { throw new IOException("Failed replication handler create", e); } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1386889) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -87,6 +88,8 @@ private final long sleepBeforeFailover; // Homemade executer service for replication private final ThreadPoolExecutor executor; + // the region server services + private final RegionServerServices services; /** * Creates a replication manager and sets the watch on all the other @@ -98,6 +101,7 @@ * @param replicating the status of the replication on this cluster * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived + * @param services the region server services */ public ReplicationSourceManager(final ReplicationZookeeper zkHelper, final Configuration conf, @@ -105,7 +109,8 @@ final FileSystem fs, final AtomicBoolean replicating, final Path logDir, - final Path oldLogDir) { + final Path oldLogDir, + final RegionServerServices services) { this.sources = new ArrayList(); this.replicating = replicating; this.zkHelper = zkHelper; @@ -133,6 +138,7 @@ ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); this.executor.setThreadFactory(tfb.build()); + this.services = services; } /** @@ -144,11 +150,16 @@ * @param id id of the peer cluster * @param position current location in the log * @param queueRecovered indicates if this queue comes from another region server + * @param fileInUse is the file currently in use */ - public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) { + public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, + boolean fileInUse) { String key = log.getName(); LOG.info("Going to report log #" + key + " for position " + position + " in " + log); this.zkHelper.writeReplicationStatus(key, id, position); + if (fileInUse) { + return; + } synchronized (this.hlogsById) { SortedSet hlogs = this.hlogsById.get(id); if (!queueRecovered && hlogs.first() != key) { @@ -198,7 +209,7 @@ */ public ReplicationSourceInterface addSource(String id) throws IOException { ReplicationSourceInterface src = - getReplicationSource(this.conf, this.fs, this, stopper, replicating, id); + getReplicationSource(this.conf, this.fs, this, services, stopper, replicating, id); // TODO set it to what's in ZK src.setSourceEnabled(true); synchronized (this.hlogsById) { @@ -297,6 +308,7 @@ * @param conf the configuration to use * @param fs the file system to use * @param manager the manager to use + * @param services the region server services * @param stopper the stopper object for this region server * @param replicating the status of the replication on this cluster * @param peerId the id of the peer cluster @@ -307,6 +319,7 @@ final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, + final RegionServerServices services, final Stoppable stopper, final AtomicBoolean replicating, final String peerId) throws IOException { @@ -322,7 +335,7 @@ src = new ReplicationSource(); } - src.init(conf, fs, manager, stopper, replicating, peerId); + src.init(conf, fs, manager, services, stopper, replicating, peerId); return src; } @@ -584,7 +597,7 @@ String peerId = entry.getKey(); try { ReplicationSourceInterface src = getReplicationSource(conf, - fs, ReplicationSourceManager.this, stopper, replicating, peerId); + fs, ReplicationSourceManager.this, services, stopper, replicating, peerId); if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) { src.terminate("Recovered queue doesn't belong to any current peer"); break; Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1386889) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -67,7 +68,7 @@ * @throws KeeperException */ public Replication(final Server server, final FileSystem fs, - final Path logDir, final Path oldLogDir) + final Path logDir, final Path oldLogDir, final RegionServerServices services) throws IOException, KeeperException { this.server = server; this.conf = this.server.getConfiguration(); @@ -75,7 +76,7 @@ if (replication) { this.zkHelper = new ReplicationZookeeper(server, this.replicating); this.replicationManager = new ReplicationSourceManager(zkHelper, conf, - this.server, fs, this.replicating, logDir, oldLogDir) ; + this.server, fs, this.replicating, logDir, oldLogDir, services) ; } else { this.replicationManager = null; this.zkHelper = null; Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1386889) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -93,6 +94,8 @@ private String peerId; // The manager of all sources to which we ping back our progress private ReplicationSourceManager manager; + // The reference to the services provided by the RS + private RegionServerServices services; // Should we stop everything? private Stoppable stopper; // List of chosen sinks (region servers) @@ -154,6 +157,7 @@ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, + final RegionServerServices services, final Stoppable stopper, final AtomicBoolean replicating, final String peerClusterZnode) @@ -182,6 +186,7 @@ this.random = new Random(); this.replicating = replicating; this.manager = manager; + this.services = services; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; @@ -289,6 +294,12 @@ } continue; } + boolean fileInUse = false; + //take a snapshot of whether the file is in use (for writing) NOW + if (services.getWAL().isFileInUse(getCurrentPath().toUri().getRawPath())) { + LOG.info("File " + getCurrentPath() + " in use"); + fileInUse = true; + } // Open a reader on it if (!openReader(sleepMultiplier)) { // Reset the sleep multiplier, else it'd be reused for the next file @@ -307,7 +318,7 @@ boolean gotIOE = false; currentNbEntries = 0; try { - if(readAllEntriesToReplicateOrNextFile()) { + if(readAllEntriesToReplicateOrNextFile(fileInUse)) { continue; } } catch (IOException ioe) { @@ -360,7 +371,7 @@ if (this.isActive() && (gotIOE || currentNbEntries == 0)) { if (this.lastLoggedPosition != this.position) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); + this.peerClusterZnode, this.position, queueRecovered, fileInUse); this.lastLoggedPosition = this.position; } if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { @@ -369,7 +380,7 @@ continue; } sleepMultiplier = 1; - shipEdits(); + shipEdits(fileInUse); } if (this.conn != null) { @@ -385,11 +396,13 @@ /** * Read all the entries from the current log files and retain those * that need to be replicated. Else, process the end of the current file. + * @param fileInUse is the file in use * @return true if we got nothing and went to the next file, false if we got * entries * @throws IOException */ - protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{ + protected boolean readAllEntriesToReplicateOrNextFile(boolean fileInUse) + throws IOException{ long seenEntries = 0; if (this.position != 0) { this.reader.seek(this.position); @@ -438,6 +451,9 @@ LOG.debug("currentNbOperations:" + currentNbOperations + " and seenEntries:" + seenEntries + " and size: " + (this.reader.getPosition() - this.position)); + if (fileInUse) { + return false; + } // If we didn't get anything and the queue has an object, it means we // hit the end of the file for sure return seenEntries == 0 && processEndOfFile(); @@ -601,7 +617,7 @@ /** * Do the shipping logic */ - protected void shipEdits() { + protected void shipEdits(boolean fileInUse) { int sleepMultiplier = 1; if (this.currentNbEntries == 0) { LOG.warn("Was given 0 edits to ship"); @@ -614,7 +630,7 @@ rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries)); if (this.lastLoggedPosition != this.position) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); + this.peerClusterZnode, this.position, queueRecovered, fileInUse); this.lastLoggedPosition = this.position; } this.totalReplicatedEdits += currentNbEntries; Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (revision 1386889) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; /** * Interface that defines a replication source @@ -37,6 +38,7 @@ * @param conf the configuration to use * @param fs the file system to use * @param manager the manager to use + * @param services the region server services * @param stopper the stopper object for this region server * @param replicating the status of the replication on this cluster * @param peerClusterId the id of the peer cluster @@ -45,6 +47,7 @@ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, + final RegionServerServices services, final Stoppable stopper, final AtomicBoolean replicating, final String peerClusterId) throws IOException;