Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (revision 1391264)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (working copy)
@@ -25,6 +25,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
@@ -39,8 +40,8 @@
@Override
public void init(Configuration conf, FileSystem fs,
- ReplicationSourceManager manager, Stoppable stopper,
- AtomicBoolean replicating, String peerClusterId)
+ ReplicationSourceManager manager, RegionServerServices services,
+ Stoppable stopper, AtomicBoolean replicating, String peerClusterId)
throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1391264)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy)
@@ -115,7 +115,7 @@
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationZookeeper.ENABLED_ZNODE_BYTES);
- replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
+ replication = new Replication(new DummyServer(), fs, logDir, oldLogDir, null);
manager = replication.getReplicationManager();
fs = FileSystem.get(conf);
oldLogDir = new Path(utility.getDataTestDir(),
@@ -199,7 +199,7 @@
hlog.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
- "1", 0, false);
+ "1", 0, false, false);
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (revision 1391264)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (working copy)
@@ -74,7 +74,7 @@
public void stop(String why) {}
@Override
public boolean isStopped() {return false;}
- }, FileSystem.get(conf), replicating, logDir, oldLogDir);
+ }, FileSystem.get(conf), replicating, logDir, oldLogDir, null);
}
/**
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java (revision 1391264)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java (working copy)
@@ -39,7 +39,7 @@
* @throws IOException
*/
public void initialize(Server rs, FileSystem fs, Path logdir,
- Path oldLogDir) throws IOException;
+ Path oldLogDir, RegionServerServices services) throws IOException;
/**
* Start replication services.
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1391264)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy)
@@ -204,6 +204,7 @@
// The timestamp (in ms) when the log file was created.
private volatile long filenum = -1;
+ private volatile String currentFilePath;
//number of transactions in the current Hlog.
private final AtomicInteger numEntries = new AtomicInteger(0);
@@ -607,6 +608,7 @@
}
byte [][] regionsToFlush = null;
this.cacheFlushLock.lock();
+ Path newPath = null;
try {
this.logRollRunning = true;
if (closed) {
@@ -621,7 +623,7 @@
oldPath = computeFilename(currentFilenum);
}
this.filenum = System.currentTimeMillis();
- Path newPath = computeFilename();
+ newPath = computeFilename();
// Tell our listeners that a new log is about to be created
if (!this.listeners.isEmpty()) {
@@ -676,6 +678,7 @@
} finally {
try {
this.logRollRunning = false;
+ this.currentFilePath = newPath.toUri().getRawPath();
} finally {
this.cacheFlushLock.unlock();
}
@@ -1959,6 +1962,14 @@
return lastDeferredTxid > syncedTillHere;
}
+ /** Is the file currently open for writing by HLog. If the filename
+ * doesn't match with the currentFilePath field value, then it is
+ * assumed that the HLog instance has closed/aborted the file.
+ */
+ public boolean isFileInUse(String filename) {
+ return currentFilePath.equals(filename);
+ }
+
/**
* Pass one or more log file names and it will either dump out a text version
* on stdout or split the specified log files.
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1391264)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -2381,7 +2381,7 @@
// create an instance of the replication object.
ReplicationService service = (ReplicationService)
ReflectionUtils.newInstance(clazz, conf);
- service.initialize(server, fs, logDir, oldLogDir);
+ service.initialize(server, fs, logDir, oldLogDir, server);
return service;
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1391264)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy)
@@ -41,6 +41,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -89,6 +90,8 @@
private final long sleepBeforeFailover;
// Homemade executer service for replication
private final ThreadPoolExecutor executor;
+ // the region server services
+ private final RegionServerServices services;
/**
* Creates a replication manager and sets the watch on all the other
@@ -100,6 +103,7 @@
* @param replicating the status of the replication on this cluster
* @param logDir the directory that contains all hlog directories of live RSs
* @param oldLogDir the directory where old logs are archived
+ * @param services the region server services
*/
public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
final Configuration conf,
@@ -107,7 +111,8 @@
final FileSystem fs,
final AtomicBoolean replicating,
final Path logDir,
- final Path oldLogDir) {
+ final Path oldLogDir,
+ final RegionServerServices services) {
this.sources = new ArrayList();
this.replicating = replicating;
this.zkHelper = zkHelper;
@@ -135,6 +140,7 @@
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat("ReplicationExecutor-%d");
this.executor.setThreadFactory(tfb.build());
+ this.services = services;
}
/**
@@ -146,11 +152,16 @@
* @param id id of the peer cluster
* @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
+ * @param fileInUse is the file currently in use
*/
- public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
+ public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered,
+ boolean fileInUse) {
String key = log.getName();
LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
this.zkHelper.writeReplicationStatus(key, id, position);
+ if (fileInUse) {
+ return;
+ }
synchronized (this.hlogsById) {
SortedSet hlogs = this.hlogsById.get(id);
if (!queueRecovered && hlogs.first() != key) {
@@ -200,7 +211,8 @@
*/
public ReplicationSourceInterface addSource(String id) throws IOException {
ReplicationSourceInterface src =
- getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
+ getReplicationSource(this.conf, this.fs, this, services, stopper,
+ replicating, id);
synchronized (this.hlogsById) {
this.sources.add(src);
this.hlogsById.put(id, new TreeSet());
@@ -297,6 +309,7 @@
* @param conf the configuration to use
* @param fs the file system to use
* @param manager the manager to use
+ * @param services the region server services
* @param stopper the stopper object for this region server
* @param replicating the status of the replication on this cluster
* @param peerId the id of the peer cluster
@@ -307,6 +320,7 @@
final Configuration conf,
final FileSystem fs,
final ReplicationSourceManager manager,
+ final RegionServerServices services,
final Stoppable stopper,
final AtomicBoolean replicating,
final String peerId) throws IOException {
@@ -322,7 +336,7 @@
src = new ReplicationSource();
}
- src.init(conf, fs, manager, stopper, replicating, peerId);
+ src.init(conf, fs, manager, services, stopper, replicating, peerId);
return src;
}
@@ -584,7 +598,7 @@
String peerId = entry.getKey();
try {
ReplicationSourceInterface src = getReplicationSource(conf,
- fs, ReplicationSourceManager.this, stopper, replicating, peerId);
+ fs, ReplicationSourceManager.this, services, stopper, replicating, peerId);
if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1391264)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy)
@@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -67,11 +68,13 @@
* @param fs handle to the filesystem
* @param logDir
* @param oldLogDir directory where logs are archived
+ * @param services the region server services
* @throws IOException
*/
public Replication(final Server server, final FileSystem fs,
- final Path logDir, final Path oldLogDir) throws IOException{
- initialize(server, fs, logDir, oldLogDir);
+ final Path logDir, final Path oldLogDir, final RegionServerServices services)
+ throws IOException{
+ initialize(server, fs, logDir, oldLogDir, services);
}
/**
@@ -81,7 +84,7 @@
}
public void initialize(final Server server, final FileSystem fs,
- final Path logDir, final Path oldLogDir) throws IOException {
+ final Path logDir, final Path oldLogDir, final RegionServerServices services) throws IOException {
this.server = server;
this.conf = this.server.getConfiguration();
this.replication = isReplication(this.conf);
@@ -93,7 +96,7 @@
"(replicating=" + this.replicating, ke);
}
this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs,
- this.replicating, logDir, oldLogDir);
+ this.replicating, logDir, oldLogDir, services);
} else {
this.replicationManager = null;
this.zkHelper = null;
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1391264)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy)
@@ -51,6 +51,7 @@
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -96,6 +97,8 @@
private String peerId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
+ // The reference to the services provided by the RS
+ private RegionServerServices services;
// Should we stop everything?
private Stoppable stopper;
// List of chosen sinks (region servers)
@@ -154,6 +157,7 @@
public void init(final Configuration conf,
final FileSystem fs,
final ReplicationSourceManager manager,
+ final RegionServerServices services,
final Stoppable stopper,
final AtomicBoolean replicating,
final String peerClusterZnode)
@@ -182,6 +186,7 @@
this.random = new Random();
this.replicating = replicating;
this.manager = manager;
+ this.services = services;
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
@@ -290,6 +295,12 @@
}
continue;
}
+ boolean fileInUse = false;
+ //take a snapshot of whether the file is in use (for writing) NOW
+ if (services.getWAL().isFileInUse(getCurrentPath().toUri().getRawPath())) {
+ LOG.info("File " + getCurrentPath() + " in use");
+ fileInUse = true;
+ }
// Open a reader on it
if (!openReader(sleepMultiplier)) {
// Reset the sleep multiplier, else it'd be reused for the next file
@@ -308,7 +319,7 @@
boolean gotIOE = false;
currentNbEntries = 0;
try {
- if(readAllEntriesToReplicateOrNextFile()) {
+ if(readAllEntriesToReplicateOrNextFile(fileInUse)) {
continue;
}
} catch (IOException ioe) {
@@ -357,7 +368,7 @@
if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered);
+ this.peerClusterZnode, this.position, queueRecovered, fileInUse);
this.lastLoggedPosition = this.position;
}
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
@@ -366,7 +377,7 @@
continue;
}
sleepMultiplier = 1;
- shipEdits();
+ shipEdits(fileInUse);
}
if (this.conn != null) {
@@ -383,11 +394,13 @@
/**
* Read all the entries from the current log files and retain those
* that need to be replicated. Else, process the end of the current file.
+ * @param fileInUse is the file in use
* @return true if we got nothing and went to the next file, false if we got
* entries
* @throws IOException
*/
- protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
+ protected boolean readAllEntriesToReplicateOrNextFile(boolean fileInUse)
+ throws IOException{
long seenEntries = 0;
if (this.position != 0) {
this.reader.seek(this.position);
@@ -437,6 +450,9 @@
LOG.debug("currentNbOperations:" + currentNbOperations +
" and seenEntries:" + seenEntries +
" and size: " + (this.reader.getPosition() - startPosition));
+ if (fileInUse) {
+ return false;
+ }
// If we didn't get anything and the queue has an object, it means we
// hit the end of the file for sure
return seenEntries == 0 && processEndOfFile();
@@ -610,7 +626,7 @@
/**
* Do the shipping logic
*/
- protected void shipEdits() {
+ protected void shipEdits(boolean fileInUse) {
int sleepMultiplier = 1;
if (this.currentNbEntries == 0) {
LOG.warn("Was given 0 edits to ship");
@@ -630,7 +646,7 @@
Arrays.copyOf(this.entriesArray, currentNbEntries));
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered);
+ this.peerClusterZnode, this.position, queueRecovered, fileInUse);
this.lastLoggedPosition = this.position;
}
this.totalReplicatedEdits += currentNbEntries;
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (revision 1391264)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (working copy)
@@ -26,6 +26,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
/**
* Interface that defines a replication source
@@ -38,6 +39,7 @@
* @param conf the configuration to use
* @param fs the file system to use
* @param manager the manager to use
+ * @param services the region server services
* @param stopper the stopper object for this region server
* @param replicating the status of the replication on this cluster
* @param peerClusterId the id of the peer cluster
@@ -46,6 +48,7 @@
public void init(final Configuration conf,
final FileSystem fs,
final ReplicationSourceManager manager,
+ final RegionServerServices services,
final Stoppable stopper,
final AtomicBoolean replicating,
final String peerClusterId) throws IOException;