diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 1bfd9a6..522971b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -110,7 +110,6 @@ import java.util.concurrent.atomic.AtomicInteger; * Does RPC against a cluster. Manages connections per regionserver in the cluster. *
See HBaseServer
*/
-@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@InterfaceAudience.Private
public class RpcClient {
public static final Log LOG = LogFactory.getLog(RpcClient.class);
@@ -374,13 +373,13 @@ public class RpcClient {
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
- @SuppressWarnings("SynchronizeOnNonFinalField")
protected class Connection extends Thread {
private ConnectionHeader header; // connection header
protected ConnectionId remoteId;
protected Socket socket = null; // connected socket
protected DataInputStream in;
- protected DataOutputStream out; // Warning: can be locked inside a class level lock.
+ protected DataOutputStream out;
+ private Object outLock = new Object();
private InetSocketAddress server; // server ip:port
private String serverPrincipal; // server's krb5 principal name
private AuthMethod authMethod; // authentication method
@@ -972,7 +971,9 @@ public class RpcClient {
}
}
this.in = new DataInputStream(new BufferedInputStream(inStream));
- this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+ synchronized (this.outLock) {
+ this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+ }
// Now write out the connection header
writeConnectionHeader();
@@ -1021,7 +1022,7 @@ public class RpcClient {
* Write the connection header.
*/
private synchronized void writeConnectionHeader() throws IOException {
- synchronized (this.out) {
+ synchronized (this.outLock) {
this.out.writeInt(this.header.getSerializedSize());
this.header.writeTo(this.out);
this.out.flush();
@@ -1042,8 +1043,8 @@ public class RpcClient {
}
// close the streams and therefore the socket
- if (this.out != null) {
- synchronized(this.out) {
+ synchronized(this.outLock) {
+ if (this.out != null) {
IOUtils.closeStream(out);
this.out = null;
}
@@ -1105,7 +1106,7 @@ public class RpcClient {
// know where we stand, we have to close the connection.
checkIsOpen();
IOException writeException = null;
- synchronized (this.out) {
+ synchronized (this.outLock) {
if (Thread.interrupted()) throw new InterruptedIOException();
calls.put(call.id, call); // We put first as we don't want the connection to become idle.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java
index 6db317c..77e2cd0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java
@@ -110,7 +110,9 @@ public class Sleeper {
woke = (woke == -1)? System.currentTimeMillis(): woke;
waitTime = this.period - (woke - startTime);
}
- triggerWake = false;
+ synchronized(sleepLock) {
+ triggerWake = false;
+ }
}
/**
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index cae7580..695f1f5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -2280,6 +2280,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
@Override
protected Object clone() throws CloneNotSupportedException {
+ super.clone();
return new KVComparator();
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java
index c4a7286..c4fcec4 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java
@@ -46,9 +46,6 @@ public class SpanReceiverHost {
}
public static SpanReceiverHost getInstance(Configuration conf) {
- if (SingletonHolder.INSTANCE.host != null) {
- return SingletonHolder.INSTANCE.host;
- }
synchronized (SingletonHolder.INSTANCE.lock) {
if (SingletonHolder.INSTANCE.host != null) {
return SingletonHolder.INSTANCE.host;
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/impl/JmxCacheBuster.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/impl/JmxCacheBuster.java
index c851bc4..5d83150 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/impl/JmxCacheBuster.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/impl/JmxCacheBuster.java
@@ -52,9 +52,8 @@ public class JmxCacheBuster {
public static void clearJmxCache() {
//If there are more then 100 ms before the executor will run then everything should be merged.
- if (fut == null || (!fut.isDone() && fut.getDelay(TimeUnit.MILLISECONDS) > 100)) return;
-
synchronized (lock) {
+ if (fut == null || (!fut.isDone() && fut.getDelay(TimeUnit.MILLISECONDS) > 100)) return;
fut = executor.getExecutor().schedule(new JmxCacheBusterRunnable(), 5, TimeUnit.SECONDS);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
index a8db472..d7fcb3d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
@@ -464,9 +464,13 @@ public class LocalHBaseCluster {
LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
cluster.startup();
Admin admin = new HBaseAdmin(conf);
- HTableDescriptor htd =
- new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
- admin.createTable(htd);
+ try {
+ HTableDescriptor htd =
+ new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
+ admin.createTable(htd);
+ } finally {
+ admin.close();
+ }
cluster.shutdown();
}
-}
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
index e2d02a1..0fb5c59 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
@@ -264,7 +264,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
// might miss the watch-trigger that creation of RESCAN node provides.
// Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
// therefore this behavior is safe.
- SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), this.recoveryMode);
+ SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), getRecoveryMode());
this.watcher
.getRecoverableZooKeeper()
.getZooKeeper()
@@ -424,7 +424,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
}
private void createNode(String path, Long retry_count) {
- SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), this.recoveryMode);
+ SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode());
ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
retry_count);
SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
@@ -757,12 +757,12 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
}
@Override
- public boolean isReplaying() {
+ public synchronized boolean isReplaying() {
return this.recoveryMode == RecoveryMode.LOG_REPLAY;
}
@Override
- public boolean isSplitting() {
+ public synchronized boolean isSplitting() {
return this.recoveryMode == RecoveryMode.LOG_SPLITTING;
}
@@ -774,15 +774,19 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
*/
@Override
public void setRecoveryMode(boolean isForInitialization) throws IOException {
- if (this.isDrainingDone) {
- // when there is no outstanding splitlogtask after master start up, we already have up to date
- // recovery mode
- return;
+ synchronized(this) {
+ if (this.isDrainingDone) {
+ // when there is no outstanding splitlogtask after master start up, we already have up to date
+ // recovery mode
+ return;
+ }
}
if (this.watcher == null) {
// when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING
- this.isDrainingDone = true;
- this.recoveryMode = RecoveryMode.LOG_SPLITTING;
+ synchronized(this) {
+ this.isDrainingDone = true;
+ this.recoveryMode = RecoveryMode.LOG_SPLITTING;
+ }
return;
}
boolean hasSplitLogTask = false;
@@ -877,7 +881,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
try {
// blocking zk call but this is done from the timeout thread
SplitLogTask slt =
- new SplitLogTask.Unassigned(this.details.getServerName(), this.recoveryMode);
+ new SplitLogTask.Unassigned(this.details.getServerName(), getRecoveryMode());
if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
LOG.debug("failed to resubmit task " + path + " version changed");
return false;
@@ -1105,7 +1109,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
}
@Override
- public RecoveryMode getRecoveryMode() {
+ public synchronized RecoveryMode getRecoveryMode() {
return recoveryMode;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/errorhandling/TimeoutExceptionInjector.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/errorhandling/TimeoutExceptionInjector.java
index 6dccd3d..43dd68d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/errorhandling/TimeoutExceptionInjector.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/errorhandling/TimeoutExceptionInjector.java
@@ -23,7 +23,6 @@ import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@@ -80,14 +79,15 @@ public class TimeoutExceptionInjector {
* For all time forward, do not throw an error because the process has completed.
*/
public void complete() {
- // warn if the timer is already marked complete. This isn't going to be thread-safe, but should
- // be good enough and its not worth locking just for a warning.
- if (this.complete) {
- LOG.warn("Timer already marked completed, ignoring!");
- return;
- }
- LOG.debug("Marking timer as complete - no error notifications will be received for this timer.");
synchronized (this.timerTask) {
+ if (this.complete) {
+ LOG.warn("Timer already marked completed, ignoring!");
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Marking timer as complete - no error notifications will be received for " +
+ "this timer.");
+ }
this.complete = true;
}
this.timer.cancel();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 01d4893..c091312 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -129,6 +129,7 @@ public class VerifyReplication extends Configured implements Tool {
ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
+ // TODO: THis HTable doesn't get closed. Fix!
Table replicatedTable = new HTable(peerConf, tableName);
scan.setStartRow(value.getRow());
scan.setStopRow(tableSplit.getEndRow());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
index 25e8a43..4131f92 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
@@ -194,7 +194,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
clusterMap.remove(masterServerName);
}
- boolean emptyRegionServerPresent = false;
long startTime = System.currentTimeMillis();
// construct a Cluster object with clusterMap and rest of the
@@ -256,10 +255,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
numTaken++;
if (numTaken >= numToOffload) break;
- // fetch in alternate order if there is new region server
- if (emptyRegionServerPresent) {
- fetchFromTail = !fetchFromTail;
- }
}
serverBalanceInfo.put(sal.getServerName(),
new BalanceInfo(numToOffload, (-1)*numTaken));
@@ -303,9 +298,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
if (numToTake == 0) continue;
addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
- if (emptyRegionServerPresent) {
- fetchFromTail = !fetchFromTail;
- }
underloadedServers.put(si, numToTake-1);
cnt++;
@@ -381,9 +373,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
addRegionPlan(regionsToMove, fetchFromTail,
server.getKey().getServerName(), regionsToReturn);
numTaken++;
- if (emptyRegionServerPresent) {
- fetchFromTail = !fetchFromTail;
- }
}
}
@@ -401,9 +390,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
}
addRegionPlan(regionsToMove, fetchFromTail,
server.getKey().getServerName(), regionsToReturn);
- if (emptyRegionServerPresent) {
- fetchFromTail = !fetchFromTail;
- }
if (regionsToMove.isEmpty()) {
break;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java
index 019a61f..3804a6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java
@@ -18,22 +18,11 @@
package org.apache.hadoop.hbase.quotas;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
-import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import com.google.common.collect.Sets;
-
/**
* In-Memory state of table or namespace quotas
*/
@@ -53,7 +42,7 @@ public class QuotaState {
lastUpdate = updateTs;
}
- public long getLastUpdate() {
+ public synchronized long getLastUpdate() {
return lastUpdate;
}
@@ -64,7 +53,7 @@ public class QuotaState {
@Override
public synchronized String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("QuotaState(ts=" + lastUpdate);
+ builder.append("QuotaState(ts=" + getLastUpdate());
if (isBypass()) {
builder.append(" bypass");
} else {
@@ -119,4 +108,12 @@ public class QuotaState {
lastQuery = EnvironmentEdgeManager.currentTime();
return globalLimiter;
}
-}
+
+ /**
+ * Return the limiter associated with this quota without updating internal last query stats
+ * @return the quota limiter
+ */
+ synchronized QuotaLimiter getGlobalLimiterWithoutUpdatingLastQuery() {
+ return globalLimiter;
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
index 73e1561..19fce22 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
@@ -23,25 +23,18 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
-import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import com.google.common.collect.Sets;
-
/**
* In-Memory state of the user quotas
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class UserQuotaState extends QuotaState {
- private static final Log LOG = LogFactory.getLog(UserQuotaState.class);
-
private Map