Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1386481)
+++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy)
@@ -109,7 +109,7 @@
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));
- replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
+ replication = new Replication(new DummyServer(), fs, logDir, oldLogDir, null);
manager = replication.getReplicationManager();
fs = FileSystem.get(conf);
oldLogDir = new Path(utility.getDataTestDir(),
@@ -195,7 +195,7 @@
hlog.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
- "1", 0, false);
+ "1", 0, false, false);
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
Index: src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (revision 1386481)
+++ src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (working copy)
@@ -74,7 +74,7 @@
public void stop(String why) {}
@Override
public boolean isStopped() {return false;}
- }, FileSystem.get(conf), replicating, logDir, oldLogDir);
+ }, FileSystem.get(conf), replicating, logDir, oldLogDir,null);
}
/**
Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1386481)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy)
@@ -196,6 +196,7 @@
// The timestamp (in ms) when the log file was created.
private volatile long filenum = -1;
+ private volatile String currentFilePath;
//number of transactions in the current Hlog.
private final AtomicInteger numEntries = new AtomicInteger(0);
@@ -585,12 +586,13 @@
byte [][] regionsToFlush = null;
this.cacheFlushLock.lock();
this.logRollRunning = true;
+ Path newPath = null;
try {
// Do all the preparation outside of the updateLock to block
// as less as possible the incoming writes
long currentFilenum = this.filenum;
this.filenum = System.currentTimeMillis();
- Path newPath = computeFilename();
+ newPath = computeFilename();
HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
// Can we get at the dfsclient outputstream? If an instance of
// SFLW, it'll have done the necessary reflection to get at the
@@ -642,6 +644,7 @@
}
} finally {
this.logRollRunning = false;
+ this.currentFilePath = newPath.toUri().getRawPath();
this.cacheFlushLock.unlock();
}
return regionsToFlush;
@@ -1679,6 +1682,14 @@
return lastDeferredSeq;
}
+ /** Is the file currently open for writing by HLog. If the filename
+ * doesn't match with the currentFilePath field value, then it is
+ * assumed that the HLog instance has closed/aborted the file.
+ */
+ public boolean isFileInUse(String filename) {
+ return currentFilePath.equals(filename);
+ }
+
/**
* Pass one or more log file names and it will either dump out a text version
* on stdout or split the specified log files.
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1386481)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -1201,7 +1201,7 @@
// log directories.
try {
this.replicationHandler = Replication.isReplication(this.conf)?
- new Replication(this, this.fs, logdir, oldLogDir): null;
+ new Replication(this, this.fs, logdir, oldLogDir, this): null;
} catch (KeeperException e) {
throw new IOException("Failed replication handler create", e);
}
Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1386481)
+++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy)
@@ -42,6 +42,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -87,6 +88,8 @@
private final long sleepBeforeFailover;
// Homemade executer service for replication
private final ThreadPoolExecutor executor;
+ // the HRegionServer instance
+ private final HRegionServer hServer;
/**
* Creates a replication manager and sets the watch on all the other
@@ -98,6 +101,7 @@
* @param replicating the status of the replication on this cluster
* @param logDir the directory that contains all hlog directories of live RSs
* @param oldLogDir the directory where old logs are archived
+ * @param hServer the instance of HRegionServer
*/
public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
final Configuration conf,
@@ -105,7 +109,8 @@
final FileSystem fs,
final AtomicBoolean replicating,
final Path logDir,
- final Path oldLogDir) {
+ final Path oldLogDir,
+ final HRegionServer hServer) {
this.sources = new ArrayList();
this.replicating = replicating;
this.zkHelper = zkHelper;
@@ -133,6 +138,7 @@
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat("ReplicationExecutor-%d");
this.executor.setThreadFactory(tfb.build());
+ this.hServer = hServer;
}
/**
@@ -144,11 +150,16 @@
* @param id id of the peer cluster
* @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
+ * @param fileNotInUse is the file currently in use
*/
- public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
+ public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered,
+ boolean fileNotInUse) {
String key = log.getName();
LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
this.zkHelper.writeReplicationStatus(key, id, position);
+ if (!fileNotInUse) {
+ return;
+ }
synchronized (this.hlogsById) {
SortedSet hlogs = this.hlogsById.get(id);
if (!queueRecovered && hlogs.first() != key) {
@@ -252,6 +263,10 @@
return this.sources;
}
+ public HRegionServer getRegionServerInstance() {
+ return hServer;
+ }
+
void logRolled(Path newLog) throws IOException {
if (!this.replicating.get()) {
LOG.warn("Replication stopped, won't add new log");
Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1386481)
+++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy)
@@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -67,7 +68,7 @@
* @throws KeeperException
*/
public Replication(final Server server, final FileSystem fs,
- final Path logDir, final Path oldLogDir)
+ final Path logDir, final Path oldLogDir, final HRegionServer hServer)
throws IOException, KeeperException {
this.server = server;
this.conf = this.server.getConfiguration();
@@ -75,7 +76,7 @@
if (replication) {
this.zkHelper = new ReplicationZookeeper(server, this.replicating);
this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
- this.server, fs, this.replicating, logDir, oldLogDir) ;
+ this.server, fs, this.replicating, logDir, oldLogDir, hServer) ;
} else {
this.replicationManager = null;
this.zkHelper = null;
Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1386481)
+++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy)
@@ -289,6 +289,13 @@
}
continue;
}
+ boolean fileNotInUse = true;
+ //take a snapshot of whether the file is in use (for writing) NOW
+ if (manager.getRegionServerInstance().getWAL()
+ .isFileInUse(getCurrentPath().toUri().getRawPath())) {
+ LOG.info("FILE " + getCurrentPath() + " in use");
+ fileNotInUse = false;
+ }
// Open a reader on it
if (!openReader(sleepMultiplier)) {
// Reset the sleep multiplier, else it'd be reused for the next file
@@ -360,7 +367,7 @@
if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered);
+ this.peerClusterZnode, this.position, queueRecovered, fileNotInUse);
this.lastLoggedPosition = this.position;
}
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
@@ -369,7 +376,7 @@
continue;
}
sleepMultiplier = 1;
- shipEdits();
+ shipEdits(fileNotInUse);
}
if (this.conn != null) {
@@ -385,11 +392,13 @@
/**
* Read all the entries from the current log files and retain those
* that need to be replicated. Else, process the end of the current file.
+ * @param fileNotInUse is the file in use
* @return true if we got nothing and went to the next file, false if we got
* entries
* @throws IOException
*/
- protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
+ protected boolean readAllEntriesToReplicateOrNextFile(boolean fileNotInUse)
+ throws IOException{
long seenEntries = 0;
if (this.position != 0) {
this.reader.seek(this.position);
@@ -438,6 +447,9 @@
LOG.debug("currentNbOperations:" + currentNbOperations +
" and seenEntries:" + seenEntries +
" and size: " + (this.reader.getPosition() - this.position));
+ if (!fileNotInUse) {
+ return false;
+ }
// If we didn't get anything and the queue has an object, it means we
// hit the end of the file for sure
return seenEntries == 0 && processEndOfFile();
@@ -601,7 +613,7 @@
/**
* Do the shipping logic
*/
- protected void shipEdits() {
+ protected void shipEdits(boolean fileNotInUse) {
int sleepMultiplier = 1;
if (this.currentNbEntries == 0) {
LOG.warn("Was given 0 edits to ship");
@@ -614,7 +626,7 @@
rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered);
+ this.peerClusterZnode, this.position, queueRecovered, fileNotInUse);
this.lastLoggedPosition = this.position;
}
this.totalReplicatedEdits += currentNbEntries;