diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
index 2ab9897..e231967 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* A cluster connection. Knows how to find the master, locate regions out on the cluster,
@@ -48,8 +47,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
* connections at a lower level.
*
*
HConnections are used by {@link HTable} mostly but also by
- * {@link HBaseAdmin}, {@link CatalogTracker},
- * and {@link ZooKeeperWatcher}. HConnection instances can be shared. Sharing
+ * {@link HBaseAdmin}, and {@link CatalogTracker}. HConnection instances can be shared. Sharing
* is usually what you want because rather than each HConnection instance
* having to do its own discovery of regions out on the cluster, instead, all
* clients get to share the one cache of locations. {@link HConnectionManager} does the
@@ -66,17 +64,6 @@ public interface HConnection extends Abortable, Closeable {
*/
public Configuration getConfiguration();
- /**
- * Retrieve ZooKeeperWatcher used by this connection.
- * @return ZooKeeperWatcher handle being used by the connection.
- * @throws IOException if a remote or network exception occurs
- * @deprecated Removed because it was a mistake exposing zookeeper in this
- * interface (ZooKeeper is an implementation detail).
- * Deprecated in HBase 0.94
- */
- @Deprecated
- public ZooKeeperWatcher getZooKeeperWatcher() throws IOException;
-
/** @return - true if the master server is running */
public boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException;
@@ -104,9 +91,9 @@ public interface HConnection extends Abortable, Closeable {
* @throws IOException if a remote or network exception occurs
*/
public boolean isTableAvailable(byte[] tableName) throws IOException;
-
+
/**
- * Use this api to check if the table has been created with the specified number of
+ * Use this api to check if the table has been created with the specified number of
* splitkeys which was used while creating the given table.
* Note : If this api is used after a table's region gets splitted, the api may return
* false.
@@ -202,7 +189,7 @@ public interface HConnection extends Abortable, Closeable {
*/
public List locateRegions(final byte[] tableName)
throws IOException;
-
+
/**
* Gets the locations of all regions in the specified table, tableName.
* @param tableName table to get regions of
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
index 715697e..d08ab95 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
@@ -1546,24 +1546,6 @@ public class HConnectionManager {
return serviceName + "@" + rsHostnamePort;
}
- @Override
- @Deprecated
- public ZooKeeperWatcher getZooKeeperWatcher()
- throws ZooKeeperConnectionException {
- canCloseZKW = false;
-
- try {
- return getKeepAliveZooKeeperWatcher();
- } catch (ZooKeeperConnectionException e){
- throw e;
- }catch (IOException e) {
- // Encapsulate exception to keep interface
- throw new ZooKeeperConnectionException(
- "Can't create a zookeeper connection", e);
- }
- }
-
-
private ZooKeeperKeepAliveConnection keepAliveZookeeper;
private int keepAliveZookeeperUserCount;
private boolean canCloseZKW = true;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperKeepAliveConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperKeepAliveConnection.java
index 01890cf..897ec19 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperKeepAliveConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperKeepAliveConnection.java
@@ -45,7 +45,9 @@ class ZooKeeperKeepAliveConnection extends ZooKeeperWatcher{
@Override
public void close() {
- ((HConnectionManager.HConnectionImplementation)abortable).releaseZooKeeperWatcher(this);
+ if (this.abortable != null) {
+ ((HConnectionManager.HConnectionImplementation)abortable).releaseZooKeeperWatcher(this);
+ }
}
void internalClose(){
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index f6c1f14..a78c503 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -68,6 +68,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
// abortable in case of zk failure
protected Abortable abortable;
+ // Used if abortable is null
+ private boolean aborted = false;
// listeners to be notified
private final List listeners =
@@ -128,10 +130,15 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
Abortable abortable) throws ZooKeeperConnectionException, IOException {
this(conf, identifier, abortable, false);
}
+
/**
* Instantiate a ZooKeeper connection and watcher.
* @param identifier string that is passed to RecoverableZookeeper to be used as
* identifier for this instance. Use null for default.
+ * @param conf
+ * @param abortable Can be null if there is on error there is no host to abort: e.g. client
+ * context.
+ * @param canCreateBaseZNode
* @throws IOException
* @throws ZooKeeperConnectionException
*/
@@ -361,8 +368,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
"ZooKeeper, aborting");
// TODO: One thought is to add call to ZooKeeperListener so say,
// ZooKeeperNodeTracker can zero out its data values.
- if (this.abortable != null) this.abortable.abort(msg,
- new KeeperException.SessionExpiredException());
+ if (this.abortable != null) {
+ this.abortable.abort(msg, new KeeperException.SessionExpiredException());
+ }
break;
case ConnectedReadOnly:
@@ -444,12 +452,13 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
@Override
public void abort(String why, Throwable e) {
- this.abortable.abort(why, e);
+ if (this.abortable != null) this.abortable.abort(why, e);
+ else this.aborted = true;
}
@Override
public boolean isAborted() {
- return this.abortable.isAborted();
+ return this.abortable == null? this.aborted: this.abortable.isAborted();
}
/**
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java
deleted file mode 100644
index b4b38b0..0000000
--- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.coprocessor.example;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.Ignore;
-import org.junit.experimental.categories.Category;
-
-@Category(MediumTests.class)
-public class TestZooKeeperScanPolicyObserver {
- private static final Log LOG = LogFactory.getLog(TestZooKeeperScanPolicyObserver.class);
- private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static final byte[] F = Bytes.toBytes("fam");
- private static final byte[] Q = Bytes.toBytes("qual");
- private static final byte[] R = Bytes.toBytes("row");
-
- // @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- System.out.println("HERE!!!!!!!!");
- // Test we can first start the ZK cluster by itself
- Configuration conf = TEST_UTIL.getConfiguration();
- conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
- ZooKeeperScanPolicyObserver.class.getName());
- TEST_UTIL.startMiniZKCluster();
- TEST_UTIL.startMiniCluster();
- }
-
- // @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- // @Ignore @Test
- public void testScanPolicyObserver() throws Exception {
- byte[] tableName = Bytes.toBytes("testScanPolicyObserver");
- HTableDescriptor desc = new HTableDescriptor(tableName);
- HColumnDescriptor hcd = new HColumnDescriptor(F)
- .setMaxVersions(10)
- .setTimeToLive(1);
- desc.addFamily(hcd);
- TEST_UTIL.getHBaseAdmin().createTable(desc);
- HTable t = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
- long now = EnvironmentEdgeManager.currentTimeMillis();
-
- ZooKeeperWatcher zkw = HConnectionManager.getConnection(TEST_UTIL.getConfiguration())
- .getZooKeeperWatcher();
- ZooKeeper zk = zkw.getRecoverableZooKeeper().getZooKeeper();
- ZKUtil.createWithParents(zkw, ZooKeeperScanPolicyObserver.node);
- // let's say test last backup was 1h ago
- // using plain ZK here, because RecoverableZooKeeper add extra encoding to the data
- zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now - 3600*1000), -1);
-
- LOG.debug("Set time: "+Bytes.toLong(Bytes.toBytes(now - 3600*1000)));
-
- // sleep for 1s to give the ZK change a chance to reach the watcher in the observer.
- // TODO: Better to wait for the data to be propagated
- Thread.sleep(1000);
-
- long ts = now - 2000;
- Put p = new Put(R);
- p.add(F, Q, ts, Q);
- t.put(p);
- p = new Put(R);
- p.add(F, Q, ts+1, Q);
- t.put(p);
-
- // these two should be expired but for the override
- // (their ts was 2s in the past)
- Get g = new Get(R);
- g.setMaxVersions(10);
- Result r = t.get(g);
- // still there?
- assertEquals(2, r.size());
-
- TEST_UTIL.flush(tableName);
- TEST_UTIL.compact(tableName, true);
-
- g = new Get(R);
- g.setMaxVersions(10);
- r = t.get(g);
- // still there?
- assertEquals(2, r.size());
- zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now), -1);
- LOG.debug("Set time: "+now);
-
- TEST_UTIL.compact(tableName, true);
-
- g = new Get(R);
- g.setMaxVersions(10);
- r = t.get(g);
- // should be gone now
- assertEquals(0, r.size());
- t.close();
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
index 6625e87..ef8ef48 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
@@ -117,7 +117,7 @@ public class Import {
extends TableMapper {
private Map cfRenameMap;
private UUID clusterId;
-
+
/**
* @param row The current table row key.
* @param value The columns.
@@ -175,20 +175,27 @@ public class Import {
Configuration conf = context.getConfiguration();
cfRenameMap = createCfRenameMap(conf);
filter = instantiateFilter(conf);
-
+ // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
+ ReplicationZookeeper zkHelper = null;
+ ZooKeeperWatcher zkw = null;
try {
HConnection connection = HConnectionManager.getConnection(conf);
- ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
- ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw);
- clusterId = zkHelper.getUUIDForCluster(zkw);
+ zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
+ zkHelper = new ReplicationZookeeper(connection, conf, zkw);
+ try {
+ this.clusterId = zkHelper.getUUIDForCluster(zkw);
+ } finally {
+ if (zkHelper != null) zkHelper.close();
+ }
} catch (ZooKeeperConnectionException e) {
LOG.error("Problem connecting to ZooKeper during task setup", e);
} catch (KeeperException e) {
LOG.error("Problem reading ZooKeeper data during task setup", e);
} catch (IOException e) {
LOG.error("Problem setting up task", e);
+ } finally {
+ if (zkw != null) zkw.close();
}
-
}
}
diff --git a/hbase-server/src/main/ruby/hbase/admin.rb b/hbase-server/src/main/ruby/hbase/admin.rb
index d5722b8..4c908be 100644
--- a/hbase-server/src/main/ruby/hbase/admin.rb
+++ b/hbase-server/src/main/ruby/hbase/admin.rb
@@ -32,7 +32,8 @@ module Hbase
@admin = org.apache.hadoop.hbase.client.HBaseAdmin.new(configuration)
connection = @admin.getConnection()
@conf = configuration
- @zk_wrapper = connection.getZooKeeperWatcher()
+ @zk_wrapper = org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.new(configuration,
+ "admin", nil)
zk = @zk_wrapper.getRecoverableZooKeeper().getZooKeeper()
@zk_main = org.apache.zookeeper.ZooKeeperMain.new(zk)
@formatter = formatter
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index c4cd185..6812133 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@@ -427,6 +428,23 @@ public class TestSplitTransactionOnCluster {
}
/**
+ * Noop Abortable implementation used below in tests.
+ */
+ static class UselessTestAbortable implements Abortable {
+ boolean aborted = false;
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.warn("ABORTED (But nothing to abort): why=" + why, e);
+ aborted = true;
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+ }
+
+ /**
* Verifies HBASE-5806. When splitting is partially done and the master goes down
* when the SPLIT node is in either SPLIT or SPLITTING state.
*
@@ -453,6 +471,8 @@ public class TestSplitTransactionOnCluster {
this.admin.setBalancerRunning(false, true);
// Turn off the meta scanner so it don't remove parent on us.
cluster.getMaster().setCatalogJanitorEnabled(false);
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(t.getConfiguration(),
+ "testMasterRestartWhenSplittingIsPartial", new UselessTestAbortable());
try {
// Add a bit of load up into the table so splittable.
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY);
@@ -467,14 +487,11 @@ public class TestSplitTransactionOnCluster {
this.admin.split(hri.getRegionNameAsString());
checkAndGetDaughters(tableName);
// Assert the ephemeral node is up in zk.
- String path = ZKAssign.getNodeName(t.getConnection()
- .getZooKeeperWatcher(), hri.getEncodedName());
- Stat stats = t.getConnection().getZooKeeperWatcher()
- .getRecoverableZooKeeper().exists(path, false);
+ String path = ZKAssign.getNodeName(zkw, hri.getEncodedName());
+ Stat stats = zkw.getRecoverableZooKeeper().exists(path, false);
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats="
+ stats);
- byte[] bytes = ZKAssign.getData(t.getConnection()
- .getZooKeeperWatcher(), hri.getEncodedName());
+ byte[] bytes = ZKAssign.getData(zkw, hri.getEncodedName());
RegionTransition rtd = RegionTransition.parseFrom(bytes);
// State could be SPLIT or SPLITTING.
assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT)
@@ -498,6 +515,7 @@ public class TestSplitTransactionOnCluster {
admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true);
t.close();
+ zkw.close();
}
}
@@ -526,6 +544,8 @@ public class TestSplitTransactionOnCluster {
this.admin.setBalancerRunning(false, true);
// Turn off the meta scanner so it don't remove parent on us.
cluster.getMaster().setCatalogJanitorEnabled(false);
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(t.getConfiguration(),
+ "testMasterRestartAtRegionSplitPendingCatalogJanitor", new UselessTestAbortable());
try {
// Add a bit of load up into the table so splittable.
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY);
@@ -536,22 +556,17 @@ public class TestSplitTransactionOnCluster {
this.admin.split(hri.getRegionNameAsString());
checkAndGetDaughters(tableName);
// Assert the ephemeral node is up in zk.
- String path = ZKAssign.getNodeName(t.getConnection()
- .getZooKeeperWatcher(), hri.getEncodedName());
- Stat stats = t.getConnection().getZooKeeperWatcher()
- .getRecoverableZooKeeper().exists(path, false);
+ String path = ZKAssign.getNodeName(zkw, hri.getEncodedName());
+ Stat stats = zkw.getRecoverableZooKeeper().exists(path, false);
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats="
+ stats);
- String node = ZKAssign.getNodeName(t.getConnection()
- .getZooKeeperWatcher(), hri.getEncodedName());
+ String node = ZKAssign.getNodeName(zkw, hri.getEncodedName());
Stat stat = new Stat();
- byte[] data = ZKUtil.getDataNoWatch(t.getConnection()
- .getZooKeeperWatcher(), node, stat);
+ byte[] data = ZKUtil.getDataNoWatch(zkw, node, stat);
// ZKUtil.create
for (int i=0; data != null && i<60; i++) {
Thread.sleep(1000);
- data = ZKUtil.getDataNoWatch(t.getConnection().getZooKeeperWatcher(),
- node, stat);
+ data = ZKUtil.getDataNoWatch(zkw, node, stat);
}
assertNull("Waited too long for ZK node to be removed: "+node, data);
@@ -571,6 +586,7 @@ public class TestSplitTransactionOnCluster {
this.admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true);
t.close();
+ zkw.close();
}
}