Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1488991) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy) @@ -1650,18 +1650,8 @@ private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink; private boolean hasEditsInDisablingOrDisabledTables = false; - private Configuration sinkConf; public LogReplayOutputSink(int numWriters) { super(numWriters); - // set a smaller retries to fast fail otherwise splitlogworker could be blocked for - // quite a while inside HConnection layer. The worker won't available for other - // tasks even after current task is preempted after a split task times out. - sinkConf = HBaseConfiguration.create(conf); - sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2); - sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT / 2); - sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); - this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT); this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters); @@ -1809,9 +1799,6 @@ // skip current kv if column family doesn't exist anymore or already flushed continue; } - } else { - LOG.warn("Can't find store max sequence ids map for region:" - + loc.getRegionInfo().getEncodedName()); } } @@ -1861,21 +1848,29 @@ */ private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn, byte[] table, byte[] row, String originalEncodedRegionName) throws IOException { + + // fetch location from cache HRegionLocation loc = onlineRegions.get(originalEncodedRegionName); if(loc != null) return loc; - + // fetch location from .META. loc = hconn.getRegionLocation(table, row, false); if (loc == null) { throw new IOException("Can't locate location for row:" + Bytes.toString(row) + " of table:" + Bytes.toString(table)); } + // check if current row moves to a different region due to region merge/split + if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) { + // originalEncodedRegionName should have already flushed + lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE); + HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName()); + if (tmpLoc != null) return tmpLoc; + } Long lastFlushedSequenceId = -1l; loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut); Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo() .getEncodedName()); - onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc); // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will // update the value for the region RegionStoreSequenceIds ids = @@ -1901,7 +1896,8 @@ LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName() + " because it's not in recovering."); } - + + onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc); return loc; } @@ -2068,6 +2064,7 @@ for (byte[] tableName : this.tableNameToHConnectionMap.keySet()) { HConnection hconn = this.tableNameToHConnectionMap.get(tableName); try { + hconn.clearRegionCache(); hconn.close(); } catch (IOException ioe) { result.add(ioe); @@ -2128,7 +2125,7 @@ synchronized (this.tableNameToHConnectionMap) { hconn = this.tableNameToHConnectionMap.get(tableName); if (hconn == null) { - hconn = HConnectionManager.createConnection(sinkConf); + hconn = HConnectionManager.getConnection(conf); this.tableNameToHConnectionMap.put(tableName, hconn); } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (revision 1488991) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.NotServingRegionException; @@ -92,6 +93,7 @@ private boolean workerInGrabTask = false; private final int report_period; private RegionServerServices server = null; + private Configuration conf = null; public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server, TaskExecutor splitTaskExecutor) { @@ -101,6 +103,7 @@ this.splitTaskExecutor = splitTaskExecutor; report_period = conf.getInt("hbase.splitlog.report.period", conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3); + this.conf = conf; } public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName, @@ -110,6 +113,7 @@ this.splitTaskExecutor = splitTaskExecutor; report_period = conf.getInt("hbase.splitlog.report.period", conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3); + this.conf = conf; } public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf, @@ -165,6 +169,8 @@ try { LOG.info("SplitLogWorker " + this.serverName + " starting"); this.watcher.registerListener(this); + // initialize a new connection for splitlogworker configuration + HConnectionManager.getConnection(conf); int res; // wait for master to create the splitLogZnode res = -1; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1488991) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1566,7 +1566,14 @@ this.rpcServer.start(); // Create the log splitting worker and start it - this.splitLogWorker = new SplitLogWorker(this.zooKeeper, this.getConfiguration(), this, this); + // set a smaller retries to fast fail otherwise splitlogworker could be blocked for + // quite a while inside HConnection layer. The worker won't available for other + // tasks even after current task is preempted after a split task times out. + Configuration sinkConf = HBaseConfiguration.create(conf); + sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2); + sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); + this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this); splitLogWorker.start(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java (revision 1488991) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java (working copy) @@ -261,7 +261,9 @@ RegionState regionState = new RegionState( hri, state, System.currentTimeMillis(), newServerName); RegionState oldState = regionStates.put(regionName, regionState); - LOG.info("Region " + hri + " transitioned from " + oldState + " to " + regionState); + if (oldState == null || oldState.getState() != regionState.getState()) { + LOG.info("Region " + hri + " transitioned from " + oldState + " to " + regionState); + } if (state != State.SPLITTING && (newServerName != null || (state != State.PENDING_CLOSE && state != State.CLOSING))) { regionsInTransition.put(regionName, regionState);