Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (revision 1391264) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (working copy) @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; @@ -39,8 +40,8 @@ @Override public void init(Configuration conf, FileSystem fs, - ReplicationSourceManager manager, Stoppable stopper, - AtomicBoolean replicating, String peerClusterId) + ReplicationSourceManager manager, RegionServerServices services, + Stoppable stopper, AtomicBoolean replicating, String peerClusterId) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1391264) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -115,7 +115,7 @@ ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationZookeeper.ENABLED_ZNODE_BYTES); - replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); + replication = new Replication(new DummyServer(), fs, logDir, oldLogDir, null); manager = replication.getReplicationManager(); fs = FileSystem.get(conf); oldLogDir = new Path(utility.getDataTestDir(), @@ -199,7 +199,7 @@ hlog.rollWriter(); manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), - "1", 0, false); + "1", 0, false, false); HLogKey key = new HLogKey(hri.getRegionName(), test, seq++, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (revision 1391264) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (working copy) @@ -74,7 +74,7 @@ public void stop(String why) {} @Override public boolean isStopped() {return false;} - }, FileSystem.get(conf), replicating, logDir, oldLogDir); + }, FileSystem.get(conf), replicating, logDir, oldLogDir, null); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java (revision 1391264) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java (working copy) @@ -39,7 +39,7 @@ * @throws IOException */ public void initialize(Server rs, FileSystem fs, Path logdir, - Path oldLogDir) throws IOException; + Path oldLogDir, RegionServerServices services) throws IOException; /** * Start replication services. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1391264) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -204,6 +204,7 @@ // The timestamp (in ms) when the log file was created. private volatile long filenum = -1; + private volatile String currentFilePath; //number of transactions in the current Hlog. private final AtomicInteger numEntries = new AtomicInteger(0); @@ -607,6 +608,7 @@ } byte [][] regionsToFlush = null; this.cacheFlushLock.lock(); + Path newPath = null; try { this.logRollRunning = true; if (closed) { @@ -621,7 +623,7 @@ oldPath = computeFilename(currentFilenum); } this.filenum = System.currentTimeMillis(); - Path newPath = computeFilename(); + newPath = computeFilename(); // Tell our listeners that a new log is about to be created if (!this.listeners.isEmpty()) { @@ -676,6 +678,7 @@ } finally { try { this.logRollRunning = false; + this.currentFilePath = newPath.toUri().getRawPath(); } finally { this.cacheFlushLock.unlock(); } @@ -1959,6 +1962,14 @@ return lastDeferredTxid > syncedTillHere; } + /** Is the file currently open for writing by HLog. If the filename + * doesn't match with the currentFilePath field value, then it is + * assumed that the HLog instance has closed/aborted the file. + */ + public boolean isFileInUse(String filename) { + return currentFilePath.equals(filename); + } + /** * Pass one or more log file names and it will either dump out a text version * on stdout or split the specified log files. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1391264) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2381,7 +2381,7 @@ // create an instance of the replication object. ReplicationService service = (ReplicationService) ReflectionUtils.newInstance(clazz, conf); - service.initialize(server, fs, logDir, oldLogDir); + service.initialize(server, fs, logDir, oldLogDir, server); return service; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1391264) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -89,6 +90,8 @@ private final long sleepBeforeFailover; // Homemade executer service for replication private final ThreadPoolExecutor executor; + // the region server services + private final RegionServerServices services; /** * Creates a replication manager and sets the watch on all the other @@ -100,6 +103,7 @@ * @param replicating the status of the replication on this cluster * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived + * @param services the region server services */ public ReplicationSourceManager(final ReplicationZookeeper zkHelper, final Configuration conf, @@ -107,7 +111,8 @@ final FileSystem fs, final AtomicBoolean replicating, final Path logDir, - final Path oldLogDir) { + final Path oldLogDir, + final RegionServerServices services) { this.sources = new ArrayList(); this.replicating = replicating; this.zkHelper = zkHelper; @@ -135,6 +140,7 @@ ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); this.executor.setThreadFactory(tfb.build()); + this.services = services; } /** @@ -146,11 +152,16 @@ * @param id id of the peer cluster * @param position current location in the log * @param queueRecovered indicates if this queue comes from another region server + * @param fileInUse is the file currently in use */ - public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) { + public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, + boolean fileInUse) { String key = log.getName(); LOG.info("Going to report log #" + key + " for position " + position + " in " + log); this.zkHelper.writeReplicationStatus(key, id, position); + if (fileInUse) { + return; + } synchronized (this.hlogsById) { SortedSet hlogs = this.hlogsById.get(id); if (!queueRecovered && hlogs.first() != key) { @@ -200,7 +211,8 @@ */ public ReplicationSourceInterface addSource(String id) throws IOException { ReplicationSourceInterface src = - getReplicationSource(this.conf, this.fs, this, stopper, replicating, id); + getReplicationSource(this.conf, this.fs, this, services, stopper, + replicating, id); synchronized (this.hlogsById) { this.sources.add(src); this.hlogsById.put(id, new TreeSet()); @@ -297,6 +309,7 @@ * @param conf the configuration to use * @param fs the file system to use * @param manager the manager to use + * @param services the region server services * @param stopper the stopper object for this region server * @param replicating the status of the replication on this cluster * @param peerId the id of the peer cluster @@ -307,6 +320,7 @@ final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, + final RegionServerServices services, final Stoppable stopper, final AtomicBoolean replicating, final String peerId) throws IOException { @@ -322,7 +336,7 @@ src = new ReplicationSource(); } - src.init(conf, fs, manager, stopper, replicating, peerId); + src.init(conf, fs, manager, services, stopper, replicating, peerId); return src; } @@ -584,7 +598,7 @@ String peerId = entry.getKey(); try { ReplicationSourceInterface src = getReplicationSource(conf, - fs, ReplicationSourceManager.this, stopper, replicating, peerId); + fs, ReplicationSourceManager.this, services, stopper, replicating, peerId); if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) { src.terminate("Recovered queue doesn't belong to any current peer"); break; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1391264) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -67,11 +68,13 @@ * @param fs handle to the filesystem * @param logDir * @param oldLogDir directory where logs are archived + * @param services the region server services * @throws IOException */ public Replication(final Server server, final FileSystem fs, - final Path logDir, final Path oldLogDir) throws IOException{ - initialize(server, fs, logDir, oldLogDir); + final Path logDir, final Path oldLogDir, final RegionServerServices services) + throws IOException{ + initialize(server, fs, logDir, oldLogDir, services); } /** @@ -81,7 +84,7 @@ } public void initialize(final Server server, final FileSystem fs, - final Path logDir, final Path oldLogDir) throws IOException { + final Path logDir, final Path oldLogDir, final RegionServerServices services) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); this.replication = isReplication(this.conf); @@ -93,7 +96,7 @@ "(replicating=" + this.replicating, ke); } this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, - this.replicating, logDir, oldLogDir); + this.replicating, logDir, oldLogDir, services); } else { this.replicationManager = null; this.zkHelper = null; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1391264) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -96,6 +97,8 @@ private String peerId; // The manager of all sources to which we ping back our progress private ReplicationSourceManager manager; + // The reference to the services provided by the RS + private RegionServerServices services; // Should we stop everything? private Stoppable stopper; // List of chosen sinks (region servers) @@ -154,6 +157,7 @@ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, + final RegionServerServices services, final Stoppable stopper, final AtomicBoolean replicating, final String peerClusterZnode) @@ -182,6 +186,7 @@ this.random = new Random(); this.replicating = replicating; this.manager = manager; + this.services = services; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; @@ -290,6 +295,12 @@ } continue; } + boolean fileInUse = false; + //take a snapshot of whether the file is in use (for writing) NOW + if (services.getWAL().isFileInUse(getCurrentPath().toUri().getRawPath())) { + LOG.info("File " + getCurrentPath() + " in use"); + fileInUse = true; + } // Open a reader on it if (!openReader(sleepMultiplier)) { // Reset the sleep multiplier, else it'd be reused for the next file @@ -308,7 +319,7 @@ boolean gotIOE = false; currentNbEntries = 0; try { - if(readAllEntriesToReplicateOrNextFile()) { + if(readAllEntriesToReplicateOrNextFile(fileInUse)) { continue; } } catch (IOException ioe) { @@ -357,7 +368,7 @@ if (this.isActive() && (gotIOE || currentNbEntries == 0)) { if (this.lastLoggedPosition != this.position) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); + this.peerClusterZnode, this.position, queueRecovered, fileInUse); this.lastLoggedPosition = this.position; } if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { @@ -366,7 +377,7 @@ continue; } sleepMultiplier = 1; - shipEdits(); + shipEdits(fileInUse); } if (this.conn != null) { @@ -383,11 +394,13 @@ /** * Read all the entries from the current log files and retain those * that need to be replicated. Else, process the end of the current file. + * @param fileInUse is the file in use * @return true if we got nothing and went to the next file, false if we got * entries * @throws IOException */ - protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{ + protected boolean readAllEntriesToReplicateOrNextFile(boolean fileInUse) + throws IOException{ long seenEntries = 0; if (this.position != 0) { this.reader.seek(this.position); @@ -437,6 +450,9 @@ LOG.debug("currentNbOperations:" + currentNbOperations + " and seenEntries:" + seenEntries + " and size: " + (this.reader.getPosition() - startPosition)); + if (fileInUse) { + return false; + } // If we didn't get anything and the queue has an object, it means we // hit the end of the file for sure return seenEntries == 0 && processEndOfFile(); @@ -610,7 +626,7 @@ /** * Do the shipping logic */ - protected void shipEdits() { + protected void shipEdits(boolean fileInUse) { int sleepMultiplier = 1; if (this.currentNbEntries == 0) { LOG.warn("Was given 0 edits to ship"); @@ -630,7 +646,7 @@ Arrays.copyOf(this.entriesArray, currentNbEntries)); if (this.lastLoggedPosition != this.position) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); + this.peerClusterZnode, this.position, queueRecovered, fileInUse); this.lastLoggedPosition = this.position; } this.totalReplicatedEdits += currentNbEntries; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (revision 1391264) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; /** * Interface that defines a replication source @@ -38,6 +39,7 @@ * @param conf the configuration to use * @param fs the file system to use * @param manager the manager to use + * @param services the region server services * @param stopper the stopper object for this region server * @param replicating the status of the replication on this cluster * @param peerClusterId the id of the peer cluster @@ -46,6 +48,7 @@ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, + final RegionServerServices services, final Stoppable stopper, final AtomicBoolean replicating, final String peerClusterId) throws IOException;