diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java index 2ab9897..875da60 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService; import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; /** * A cluster connection. Knows how to find the master, locate regions out on the cluster, @@ -66,17 +65,6 @@ public interface HConnection extends Abortable, Closeable { */ public Configuration getConfiguration(); - /** - * Retrieve ZooKeeperWatcher used by this connection. - * @return ZooKeeperWatcher handle being used by the connection. - * @throws IOException if a remote or network exception occurs - * @deprecated Removed because it was a mistake exposing zookeeper in this - * interface (ZooKeeper is an implementation detail). - * Deprecated in HBase 0.94 - */ - @Deprecated - public ZooKeeperWatcher getZooKeeperWatcher() throws IOException; - /** @return - true if the master server is running */ public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 715697e..d08ab95 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -1546,24 +1546,6 @@ public class HConnectionManager { return serviceName + "@" + rsHostnamePort; } - @Override - @Deprecated - public ZooKeeperWatcher getZooKeeperWatcher() - throws ZooKeeperConnectionException { - canCloseZKW = false; - - try { - return getKeepAliveZooKeeperWatcher(); - } catch (ZooKeeperConnectionException e){ - throw e; - }catch (IOException e) { - // Encapsulate exception to keep interface - throw new ZooKeeperConnectionException( - "Can't create a zookeeper connection", e); - } - } - - private ZooKeeperKeepAliveConnection keepAliveZookeeper; private int keepAliveZookeeperUserCount; private boolean canCloseZKW = true; diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java deleted file mode 100644 index b4b38b0..0000000 --- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.coprocessor.example; - -import static org.junit.Assert.assertEquals; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.ZooKeeper; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.Ignore; -import org.junit.experimental.categories.Category; - -@Category(MediumTests.class) -public class TestZooKeeperScanPolicyObserver { - private static final Log LOG = LogFactory.getLog(TestZooKeeperScanPolicyObserver.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final byte[] F = Bytes.toBytes("fam"); - private static final byte[] Q = Bytes.toBytes("qual"); - private static final byte[] R = Bytes.toBytes("row"); - - // @BeforeClass - public static void setUpBeforeClass() throws Exception { - System.out.println("HERE!!!!!!!!"); - // Test we can first start the ZK cluster by itself - Configuration conf = TEST_UTIL.getConfiguration(); - conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - ZooKeeperScanPolicyObserver.class.getName()); - TEST_UTIL.startMiniZKCluster(); - TEST_UTIL.startMiniCluster(); - } - - // @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - // @Ignore @Test - public void testScanPolicyObserver() throws Exception { - byte[] tableName = Bytes.toBytes("testScanPolicyObserver"); - HTableDescriptor desc = new HTableDescriptor(tableName); - HColumnDescriptor hcd = new HColumnDescriptor(F) - .setMaxVersions(10) - .setTimeToLive(1); - desc.addFamily(hcd); - TEST_UTIL.getHBaseAdmin().createTable(desc); - HTable t = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName); - long now = EnvironmentEdgeManager.currentTimeMillis(); - - ZooKeeperWatcher zkw = HConnectionManager.getConnection(TEST_UTIL.getConfiguration()) - .getZooKeeperWatcher(); - ZooKeeper zk = zkw.getRecoverableZooKeeper().getZooKeeper(); - ZKUtil.createWithParents(zkw, ZooKeeperScanPolicyObserver.node); - // let's say test last backup was 1h ago - // using plain ZK here, because RecoverableZooKeeper add extra encoding to the data - zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now - 3600*1000), -1); - - LOG.debug("Set time: "+Bytes.toLong(Bytes.toBytes(now - 3600*1000))); - - // sleep for 1s to give the ZK change a chance to reach the watcher in the observer. - // TODO: Better to wait for the data to be propagated - Thread.sleep(1000); - - long ts = now - 2000; - Put p = new Put(R); - p.add(F, Q, ts, Q); - t.put(p); - p = new Put(R); - p.add(F, Q, ts+1, Q); - t.put(p); - - // these two should be expired but for the override - // (their ts was 2s in the past) - Get g = new Get(R); - g.setMaxVersions(10); - Result r = t.get(g); - // still there? - assertEquals(2, r.size()); - - TEST_UTIL.flush(tableName); - TEST_UTIL.compact(tableName, true); - - g = new Get(R); - g.setMaxVersions(10); - r = t.get(g); - // still there? - assertEquals(2, r.size()); - zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now), -1); - LOG.debug("Set time: "+now); - - TEST_UTIL.compact(tableName, true); - - g = new Get(R); - g.setMaxVersions(10); - r = t.get(g); - // should be gone now - assertEquals(0, r.size()); - t.close(); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 6625e87..bc4d6e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; @@ -117,7 +118,7 @@ public class Import { extends TableMapper { private Map cfRenameMap; private UUID clusterId; - + /** * @param row The current table row key. * @param value The columns. @@ -175,21 +176,50 @@ public class Import { Configuration conf = context.getConfiguration(); cfRenameMap = createCfRenameMap(conf); filter = instantiateFilter(conf); - + // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid. + ReplicationZookeeper zkHelper = null; + ZooKeeperWatcher zkw = null; try { HConnection connection = HConnectionManager.getConnection(conf); - ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); - ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw); - clusterId = zkHelper.getUUIDForCluster(zkw); + zkw = new ZooKeeperWatcher(conf, + context.getTaskAttemptID().toString(), new Abortable() { + private boolean aborted = false; + + @Override + public void abort(String why, Throwable e) { + LOG.warn("ABORTED CALLED (But nothing to abort): why=" + why, e); + this.aborted = true; + } + + @Override + public boolean isAborted() { + return this.aborted; + } + }); + zkHelper = new ReplicationZookeeper(connection, conf, zkw); + try { + this.clusterId = zkHelper.getUUIDForCluster(zkw); + } finally { + if (zkHelper != null) zkHelper.close(); + } } catch (ZooKeeperConnectionException e) { LOG.error("Problem connecting to ZooKeper during task setup", e); } catch (KeeperException e) { LOG.error("Problem reading ZooKeeper data during task setup", e); } catch (IOException e) { LOG.error("Problem setting up task", e); - } + } finally { + if (zkw != null) zkw.close(); + } + } + @Override + protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) + throws IOException, InterruptedException { + super.cleanup(context); } + + } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index c4cd185..6812133 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -427,6 +428,23 @@ public class TestSplitTransactionOnCluster { } /** + * Noop Abortable implementation used below in tests. + */ + static class UselessTestAbortable implements Abortable { + boolean aborted = false; + @Override + public void abort(String why, Throwable e) { + LOG.warn("ABORTED (But nothing to abort): why=" + why, e); + aborted = true; + } + + @Override + public boolean isAborted() { + return this.aborted; + } + } + + /** * Verifies HBASE-5806. When splitting is partially done and the master goes down * when the SPLIT node is in either SPLIT or SPLITTING state. * @@ -453,6 +471,8 @@ public class TestSplitTransactionOnCluster { this.admin.setBalancerRunning(false, true); // Turn off the meta scanner so it don't remove parent on us. cluster.getMaster().setCatalogJanitorEnabled(false); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(t.getConfiguration(), + "testMasterRestartWhenSplittingIsPartial", new UselessTestAbortable()); try { // Add a bit of load up into the table so splittable. TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY); @@ -467,14 +487,11 @@ public class TestSplitTransactionOnCluster { this.admin.split(hri.getRegionNameAsString()); checkAndGetDaughters(tableName); // Assert the ephemeral node is up in zk. - String path = ZKAssign.getNodeName(t.getConnection() - .getZooKeeperWatcher(), hri.getEncodedName()); - Stat stats = t.getConnection().getZooKeeperWatcher() - .getRecoverableZooKeeper().exists(path, false); + String path = ZKAssign.getNodeName(zkw, hri.getEncodedName()); + Stat stats = zkw.getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats); - byte[] bytes = ZKAssign.getData(t.getConnection() - .getZooKeeperWatcher(), hri.getEncodedName()); + byte[] bytes = ZKAssign.getData(zkw, hri.getEncodedName()); RegionTransition rtd = RegionTransition.parseFrom(bytes); // State could be SPLIT or SPLITTING. assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) @@ -498,6 +515,7 @@ public class TestSplitTransactionOnCluster { admin.setBalancerRunning(true, false); cluster.getMaster().setCatalogJanitorEnabled(true); t.close(); + zkw.close(); } } @@ -526,6 +544,8 @@ public class TestSplitTransactionOnCluster { this.admin.setBalancerRunning(false, true); // Turn off the meta scanner so it don't remove parent on us. cluster.getMaster().setCatalogJanitorEnabled(false); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(t.getConfiguration(), + "testMasterRestartAtRegionSplitPendingCatalogJanitor", new UselessTestAbortable()); try { // Add a bit of load up into the table so splittable. TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY); @@ -536,22 +556,17 @@ public class TestSplitTransactionOnCluster { this.admin.split(hri.getRegionNameAsString()); checkAndGetDaughters(tableName); // Assert the ephemeral node is up in zk. - String path = ZKAssign.getNodeName(t.getConnection() - .getZooKeeperWatcher(), hri.getEncodedName()); - Stat stats = t.getConnection().getZooKeeperWatcher() - .getRecoverableZooKeeper().exists(path, false); + String path = ZKAssign.getNodeName(zkw, hri.getEncodedName()); + Stat stats = zkw.getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats); - String node = ZKAssign.getNodeName(t.getConnection() - .getZooKeeperWatcher(), hri.getEncodedName()); + String node = ZKAssign.getNodeName(zkw, hri.getEncodedName()); Stat stat = new Stat(); - byte[] data = ZKUtil.getDataNoWatch(t.getConnection() - .getZooKeeperWatcher(), node, stat); + byte[] data = ZKUtil.getDataNoWatch(zkw, node, stat); // ZKUtil.create for (int i=0; data != null && i<60; i++) { Thread.sleep(1000); - data = ZKUtil.getDataNoWatch(t.getConnection().getZooKeeperWatcher(), - node, stat); + data = ZKUtil.getDataNoWatch(zkw, node, stat); } assertNull("Waited too long for ZK node to be removed: "+node, data); @@ -571,6 +586,7 @@ public class TestSplitTransactionOnCluster { this.admin.setBalancerRunning(true, false); cluster.getMaster().setCatalogJanitorEnabled(true); t.close(); + zkw.close(); } }