diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ConsensusProvider.java hbase-client/src/main/java/org/apache/hadoop/hbase/ConsensusProvider.java index 09870d2..411cc93 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ConsensusProvider.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ConsensusProvider.java @@ -55,4 +55,10 @@ public interface ConsensusProvider { * @return instance of Server consensus runs within */ Server getServer(); + + /** + * Returns implementation of TableStateManager. + * @throws java.lang.InterruptedException if operation is interrupted + */ + TableStateManager getTableStateManager() throws InterruptedException; } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/TableStateManager.java hbase-client/src/main/java/org/apache/hadoop/hbase/TableStateManager.java new file mode 100644 index 0000000..29b5136 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/TableStateManager.java @@ -0,0 +1,198 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.ConsensusException; + +import java.io.InterruptedIOException; +import java.util.Set; + +/** + * Helper class for table state management for use by AssignmentManager. + * Depending on implementation, fetches information from HBase system table, + * local data store, ZooKeeper ensemble or somewhere else. + * Read-only clients other than AssignmentManager interested in learning + * table state can use the read-only utility methods in {@link TableStateReader}. + */ +@InterfaceAudience.Private +public interface TableStateManager { + + /** + * Sets the specified table as DISABLED. + * Fails silently if the table is already disabled. + * @param tableName table to be disabled + * @throws ConsensusException if error happened in underlying coordination engine + */ + void setDisabledTable(TableName tableName) throws ConsensusException; + + /** + * Sets the specified table as DISABLING. + * Fails silently if the table is already disabled. + * @param tableName table to be set as disabling + * @throws ConsensusException if error happened in underlying coordination engine + */ + void setDisablingTable(TableName tableName) throws ConsensusException; + + /** + * Sets the specified table as ENABLING. + * Fails silently if the table is already disabled. + * @param tableName table to be set as enabling + * @throws ConsensusException if error happened in underlying coordination engine + */ + void setEnablingTable(TableName tableName) throws ConsensusException; + + /** + * Sets the specified table as ENABLING atomically. + * If the table is already in ENABLING state, no operation is performed + * @param tableName table to be set as enabling + * @throws ConsensusException if error happened in underlying coordination engine + * @return if the operation succeeds or not + */ + boolean checkAndSetEnablingTable(TableName tableName) throws ConsensusException; + + /** + * Sets the specified table as ENABLING atomically + * If the table isn't in DISABLED state, no operation is performed + * @param tableName table to be set as enabling + * @throws ConsensusException if error happened in underlying coordination engine + * @return if the operation succeeds or not + */ + boolean checkDisabledAndSetEnablingTable(TableName tableName) throws ConsensusException; + + /** + * Sets the specified table as DISABLING atomically + * If the table isn't in ENABLED state, no operation is performed + * @param tableName table to be table to be set as disabling + * @throws ConsensusException if error happened in underlying coordination engine + * @return if the operation succeeds or not + */ + boolean checkEnabledAndSetDisablingTable(TableName tableName) throws ConsensusException; + + /** + * @param + * @return true if table is disabled, false otherwise + */ + boolean isDisabledTable(TableName tableName); + + /** + * @return true if table is disabling, false otherwise + */ + boolean isDisablingTable(TableName tableName); + + /** + * @return true if table is enabling, false otherwise + */ + boolean isEnablingTable(TableName tableName); + + /** + * @return true if table is enabled, false otherwise + */ + boolean isEnabledTable(TableName tableName); + + /** + * @return true if table is disabling OR disabled, false otherwise. + */ + boolean isDisablingOrDisabledTable(TableName tableName); + + /** + * @return true if table is enabling OR enabled, false otherwise. + */ + boolean isEnablingOrEnabledTable(TableName tableName); + + /** + * @return true if table is enabled or disabling, false otherwise. + */ + boolean isEnabledOrDisablingTable(TableName tableName); + + /** + * @return true if table is disabled or enabling, false otherwise. + */ + boolean isDisabledOrEnablingTable(TableName tableName); + + /** + * Mark table as deleted. Fails silently if the + * table is not currently marked as disabled. + * @param tableName table to be deleted + * @throws ConsensusException if error happened in underlying coordination engine + */ + void setDeletedTable(TableName tableName) throws ConsensusException; + + /** + * Forcibly mark table as ENABLED. + * + * @param tableName table to be marked enabled + * @throws ConsensusException if error happened in underlying coordination engine + */ + void setEnabledTable(TableName tableName) throws ConsensusException; + + /** + * Check if table is present. + * + * @param tableName table we're checking + * @return true if the table is present + */ + boolean isTablePresent(TableName tableName); + + /** + * Gets a list of all the tables set as disabled. + * @return Set of disabled tables, empty Set if none + */ + Set getDisabledTables(); + + /** + * Gets a list of all the tables set as disabling. + * @return Set of disabled tables, empty Set if none + * @throws ConsensusException if error happened in underlying coordination engine + * @throws InterruptedException if call is interrupted + */ + Set getDisablingTables() + throws ConsensusException, InterruptedIOException; + + /** + * Gets a list of all the tables set as enabling. + * @return Set of disabled tables, empty Set if none + * @throws ConsensusException if error happened in underlying coordination engine + * @throws InterruptedException if call is interrupted + */ + Set getEnablingTables() + throws ConsensusException, InterruptedIOException; + + /** + * Gets a list of all the tables set as disabled or disabling. + * @return Set of disabled tables, empty Set if none + * @throws ConsensusException if error happened in underlying coordination engine + * @throws InterruptedException if call is interrupted + */ + Set getDisabledOrDisablingTables() + throws ConsensusException, InterruptedIOException; + + /** + * If the table is found in ENABLING state the in-memory state is removed. This + * helps in cases where CreateTable is to be retried by the client in case of + * failures. If deletePermanentState is true - the flag kept permanently is + * also reset. + * + * @param tableName table we're working on + * @param deletePermanentState if true, reset the permanent flag + * @throws ConsensusException if error happened in underlying coordination engine + */ + void removeEnablingTable(TableName tableName, boolean deletePermanentState) + throws ConsensusException; +} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConsensusException.java hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConsensusException.java new file mode 100644 index 0000000..0703cb2 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConsensusException.java @@ -0,0 +1,45 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Thrown by operations requiring coordination when internal error + * within coordination engine occurs. + */ +@InterfaceAudience.Private +@SuppressWarnings("serial") +public class ConsensusException extends HBaseException { + public ConsensusException() { + super(); + } + + public ConsensusException(final String message) { + super(message); + } + + public ConsensusException(final String message, final Throwable t) { + super(message, t); + } + + public ConsensusException(final Throwable t) { + super(t); + } +} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java deleted file mode 100644 index 36e6b89..0000000 --- hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java +++ /dev/null @@ -1,406 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.zookeeper; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; -import org.apache.zookeeper.KeeperException; - -import java.io.InterruptedIOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * Helper class for table state tracking for use by AssignmentManager. - * Reads, caches and sets state up in zookeeper. If multiple read/write - * clients, will make for confusion. Read-only clients other than - * AssignmentManager interested in learning table state can use the - * read-only utility methods in {@link ZKTableReadOnly}. - * - *

To save on trips to the zookeeper ensemble, internally we cache table - * state. - */ -@InterfaceAudience.Private -public class ZKTable { - // A znode will exist under the table directory if it is in any of the - // following states: {@link TableState#ENABLING} , {@link TableState#DISABLING}, - // or {@link TableState#DISABLED}. If {@link TableState#ENABLED}, there will - // be no entry for a table in zk. Thats how it currently works. - - private static final Log LOG = LogFactory.getLog(ZKTable.class); - private final ZooKeeperWatcher watcher; - - /** - * Cache of what we found in zookeeper so we don't have to go to zk ensemble - * for every query. Synchronize access rather than use concurrent Map because - * synchronization needs to span query of zk. - */ - private final Map cache = - new HashMap(); - - // TODO: Make it so always a table znode. Put table schema here as well as table state. - // Have watcher on table znode so all are notified of state or schema change. - - public ZKTable(final ZooKeeperWatcher zkw) throws KeeperException, InterruptedException { - super(); - this.watcher = zkw; - populateTableStates(); - } - - /** - * Gets a list of all the tables set as disabled in zookeeper. - * @throws KeeperException - */ - private void populateTableStates() - throws KeeperException, InterruptedException { - synchronized (this.cache) { - List children = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.tableZNode); - if (children == null) return; - for (String child: children) { - TableName tableName = TableName.valueOf(child); - ZooKeeperProtos.Table.State state = ZKTableReadOnly.getTableState(this.watcher, tableName); - if (state != null) this.cache.put(tableName, state); - } - } - } - - /** - * Sets the specified table as DISABLED in zookeeper. Fails silently if the - * table is already disabled in zookeeper. Sets no watches. - * @param tableName - * @throws KeeperException unexpected zookeeper exception - */ - public void setDisabledTable(TableName tableName) - throws KeeperException { - synchronized (this.cache) { - if (!isDisablingOrDisabledTable(tableName)) { - LOG.warn("Moving table " + tableName + " state to disabled but was " + - "not first in disabling state: " + this.cache.get(tableName)); - } - setTableState(tableName, ZooKeeperProtos.Table.State.DISABLED); - } - } - - /** - * Sets the specified table as DISABLING in zookeeper. Fails silently if the - * table is already disabled in zookeeper. Sets no watches. - * @param tableName - * @throws KeeperException unexpected zookeeper exception - */ - public void setDisablingTable(final TableName tableName) - throws KeeperException { - synchronized (this.cache) { - if (!isEnabledOrDisablingTable(tableName)) { - LOG.warn("Moving table " + tableName + " state to disabling but was " + - "not first in enabled state: " + this.cache.get(tableName)); - } - setTableState(tableName, ZooKeeperProtos.Table.State.DISABLING); - } - } - - /** - * Sets the specified table as ENABLING in zookeeper. Fails silently if the - * table is already disabled in zookeeper. Sets no watches. - * @param tableName - * @throws KeeperException unexpected zookeeper exception - */ - public void setEnablingTable(final TableName tableName) - throws KeeperException { - synchronized (this.cache) { - if (!isDisabledOrEnablingTable(tableName)) { - LOG.warn("Moving table " + tableName + " state to enabling but was " + - "not first in disabled state: " + this.cache.get(tableName)); - } - setTableState(tableName, ZooKeeperProtos.Table.State.ENABLING); - } - } - - /** - * Sets the specified table as ENABLING in zookeeper atomically - * If the table is already in ENABLING state, no operation is performed - * @param tableName - * @return if the operation succeeds or not - * @throws KeeperException unexpected zookeeper exception - */ - public boolean checkAndSetEnablingTable(final TableName tableName) - throws KeeperException { - synchronized (this.cache) { - if (isEnablingOrEnabledTable(tableName)) { - return false; - } - setTableState(tableName, ZooKeeperProtos.Table.State.ENABLING); - return true; - } - } - - /** - * Sets the specified table as ENABLING in zookeeper atomically - * If the table isn't in DISABLED state, no operation is performed - * @param tableName - * @return if the operation succeeds or not - * @throws KeeperException unexpected zookeeper exception - */ - public boolean checkDisabledAndSetEnablingTable(final TableName tableName) - throws KeeperException { - synchronized (this.cache) { - if (!isDisabledTable(tableName)) { - return false; - } - setTableState(tableName, ZooKeeperProtos.Table.State.ENABLING); - return true; - } - } - - /** - * Sets the specified table as DISABLING in zookeeper atomically - * If the table isn't in ENABLED state, no operation is performed - * @param tableName - * @return if the operation succeeds or not - * @throws KeeperException unexpected zookeeper exception - */ - public boolean checkEnabledAndSetDisablingTable(final TableName tableName) - throws KeeperException { - synchronized (this.cache) { - if (this.cache.get(tableName) != null && !isEnabledTable(tableName)) { - return false; - } - setTableState(tableName, ZooKeeperProtos.Table.State.DISABLING); - return true; - } - } - - private void setTableState(final TableName tableName, final ZooKeeperProtos.Table.State state) - throws KeeperException { - String znode = ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()); - if (ZKUtil.checkExists(this.watcher, znode) == -1) { - ZKUtil.createAndFailSilent(this.watcher, znode); - } - synchronized (this.cache) { - ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder(); - builder.setState(state); - byte [] data = ProtobufUtil.prependPBMagic(builder.build().toByteArray()); - ZKUtil.setData(this.watcher, znode, data); - this.cache.put(tableName, state); - } - } - - public boolean isDisabledTable(final TableName tableName) { - return isTableState(tableName, ZooKeeperProtos.Table.State.DISABLED); - } - - public boolean isDisablingTable(final TableName tableName) { - return isTableState(tableName, ZooKeeperProtos.Table.State.DISABLING); - } - - public boolean isEnablingTable(final TableName tableName) { - return isTableState(tableName, ZooKeeperProtos.Table.State.ENABLING); - } - - public boolean isEnabledTable(TableName tableName) { - return isTableState(tableName, ZooKeeperProtos.Table.State.ENABLED); - } - - public boolean isDisablingOrDisabledTable(final TableName tableName) { - synchronized (this.cache) { - return isDisablingTable(tableName) || isDisabledTable(tableName); - } - } - - public boolean isEnablingOrEnabledTable(final TableName tableName) { - synchronized (this.cache) { - return isEnablingTable(tableName) || isEnabledTable(tableName); - } - } - - public boolean isEnabledOrDisablingTable(final TableName tableName) { - synchronized (this.cache) { - return isEnabledTable(tableName) || isDisablingTable(tableName); - } - } - - public boolean isDisabledOrEnablingTable(final TableName tableName) { - synchronized (this.cache) { - return isDisabledTable(tableName) || isEnablingTable(tableName); - } - } - - private boolean isTableState(final TableName tableName, final ZooKeeperProtos.Table.State state) { - synchronized (this.cache) { - ZooKeeperProtos.Table.State currentState = this.cache.get(tableName); - return ZKTableReadOnly.isTableState(currentState, state); - } - } - - /** - * Deletes the table in zookeeper. Fails silently if the - * table is not currently disabled in zookeeper. Sets no watches. - * @param tableName - * @throws KeeperException unexpected zookeeper exception - */ - public void setDeletedTable(final TableName tableName) - throws KeeperException { - synchronized (this.cache) { - if (this.cache.remove(tableName) == null) { - LOG.warn("Moving table " + tableName + " state to deleted but was " + - "already deleted"); - } - ZKUtil.deleteNodeFailSilent(this.watcher, - ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString())); - } - } - - /** - * Sets the ENABLED state in the cache and creates or force updates a node to - * ENABLED state for the specified table - * - * @param tableName - * @throws KeeperException - */ - public void setEnabledTable(final TableName tableName) throws KeeperException { - setTableState(tableName, ZooKeeperProtos.Table.State.ENABLED); - } - - /** - * check if table is present . - * - * @param tableName - * @return true if the table is present - */ - public boolean isTablePresent(final TableName tableName) { - synchronized (this.cache) { - ZooKeeperProtos.Table.State state = this.cache.get(tableName); - return !(state == null); - } - } - - /** - * Gets a list of all the tables set as disabled in zookeeper. - * @return Set of disabled tables, empty Set if none - */ - public Set getDisabledTables() { - Set disabledTables = new HashSet(); - synchronized (this.cache) { - Set tables = this.cache.keySet(); - for (TableName table: tables) { - if (isDisabledTable(table)) disabledTables.add(table); - } - } - return disabledTables; - } - - /** - * Gets a list of all the tables set as disabled in zookeeper. - * @return Set of disabled tables, empty Set if none - * @throws KeeperException - */ - public static Set getDisabledTables(ZooKeeperWatcher zkw) - throws KeeperException, InterruptedIOException { - return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLED); - } - - /** - * Gets a list of all the tables set as disabling in zookeeper. - * @return Set of disabling tables, empty Set if none - * @throws KeeperException - */ - public static Set getDisablingTables(ZooKeeperWatcher zkw) - throws KeeperException, InterruptedIOException { - return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLING); - } - - /** - * Gets a list of all the tables set as enabling in zookeeper. - * @return Set of enabling tables, empty Set if none - * @throws KeeperException - */ - public static Set getEnablingTables(ZooKeeperWatcher zkw) - throws KeeperException, InterruptedIOException { - return getAllTables(zkw, ZooKeeperProtos.Table.State.ENABLING); - } - - /** - * Gets a list of all the tables set as disabled in zookeeper. - * @return Set of disabled tables, empty Set if none - * @throws KeeperException - */ - public static Set getDisabledOrDisablingTables(ZooKeeperWatcher zkw) - throws KeeperException, InterruptedIOException { - return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLED, - ZooKeeperProtos.Table.State.DISABLING); - } - - /** - * If the table is found in ENABLING state the inmemory state is removed. This - * helps in cases where CreateTable is to be retried by the client incase of - * failures. If deleteZNode is true - the znode is also deleted - * - * @param tableName - * @param deleteZNode - * @throws KeeperException - */ - public void removeEnablingTable(final TableName tableName, boolean deleteZNode) - throws KeeperException { - synchronized (this.cache) { - if (isEnablingTable(tableName)) { - this.cache.remove(tableName); - if (deleteZNode) { - ZKUtil.deleteNodeFailSilent(this.watcher, - ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString())); - } - } - } - } - - - /** - * Gets a list of all the tables of specified states in zookeeper. - * @return Set of tables of specified states, empty Set if none - * @throws KeeperException - */ - static Set getAllTables(final ZooKeeperWatcher zkw, - final ZooKeeperProtos.Table.State... states) throws KeeperException, InterruptedIOException { - Set allTables = new HashSet(); - List children = - ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode); - if(children == null) return allTables; - for (String child: children) { - TableName tableName = TableName.valueOf(child); - ZooKeeperProtos.Table.State state = null; - try { - state = ZKTableReadOnly.getTableState(zkw, tableName); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - for (ZooKeeperProtos.Table.State expectedState: states) { - if (state == expectedState) { - allTables.add(tableName); - break; - } - } - } - return allTables; - } -} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableReadOnly.java hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableReadOnly.java index 236aa7f..28f5ea3 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableReadOnly.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableReadOnly.java @@ -37,7 +37,7 @@ import java.util.Set; * clients other than AssignmentManager for reading the * state of a table in ZK. * - *

Does not cache state like {@link ZKTable}, actually reads from ZK each call. + *

Does not cache state like {@link ZKTableStateManager}, actually reads from ZK each call. */ @InterfaceAudience.Private public class ZKTableReadOnly { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateManager.java hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateManager.java new file mode 100644 index 0000000..e181037 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateManager.java @@ -0,0 +1,469 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableStateManager; +import org.apache.hadoop.hbase.exceptions.ConsensusException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.zookeeper.KeeperException; + +import java.io.InterruptedIOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Helper class for table state tracking for use by AssignmentManager. + * Reads, caches and sets state up in zookeeper. If multiple read/write + * clients, will make for confusion. Read-only clients other than + * AssignmentManager interested in learning table state can use the + * read-only utility methods in {@link ZKTableReadOnly}. + * + *

To save on trips to the zookeeper ensemble, internally we cache table + * state. + */ +@InterfaceAudience.Private +public class ZKTableStateManager implements TableStateManager { + // A znode will exist under the table directory if it is in any of the + // following states: {@link TableState#ENABLING} , {@link TableState#DISABLING}, + // or {@link TableState#DISABLED}. If {@link TableState#ENABLED}, there will + // be no entry for a table in zk. Thats how it currently works. + + private static final Log LOG = LogFactory.getLog(ZKTableStateManager.class); + private final ZooKeeperWatcher watcher; + + /** + * Cache of what we found in zookeeper so we don't have to go to zk ensemble + * for every query. Synchronize access rather than use concurrent Map because + * synchronization needs to span query of zk. + */ + private final Map cache = + new HashMap(); + + // TODO: Make it so always a table znode. Put table schema here as well as table state. + // Have watcher on table znode so all are notified of state or schema change. + + public ZKTableStateManager(final ZooKeeperWatcher zkw) throws KeeperException, + InterruptedException { + super(); + this.watcher = zkw; + populateTableStates(); + } + + /** + * Gets a list of all the tables set as disabled in zookeeper. + * @throws KeeperException, InterruptedException + */ + private void populateTableStates() + throws KeeperException, InterruptedException { + synchronized (this.cache) { + List children = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.tableZNode); + if (children == null) return; + for (String child: children) { + TableName tableName = TableName.valueOf(child); + ZooKeeperProtos.Table.State state = ZKTableReadOnly.getTableState(this.watcher, tableName); + if (state != null) this.cache.put(tableName, state); + } + } + } + + /** + * Sets the specified table as DISABLED in zookeeper. Fails silently if the + * table is already disabled in zookeeper. Sets no watches. + * @param tableName table we're working on + * @throws ConsensusException if error happened in underlying coordination engine + */ + @Override + public void setDisabledTable(TableName tableName) + throws ConsensusException { + synchronized (this.cache) { + if (!isDisablingOrDisabledTable(tableName)) { + LOG.warn("Moving table " + tableName + " state to disabled but was " + + "not first in disabling state: " + this.cache.get(tableName)); + } + try { + setTableState(tableName, ZooKeeperProtos.Table.State.DISABLED); + } catch (KeeperException e) { + throw new ConsensusException(e); + } + } + } + + /** + * Sets the specified table as DISABLING in zookeeper. Fails silently if the + * table is already disabled in zookeeper. Sets no watches. + * @param tableName table we're working on + * @throws ConsensusException if error happened in underlying coordination engine + */ + @Override + public void setDisablingTable(final TableName tableName) + throws ConsensusException { + synchronized (this.cache) { + if (!isEnabledOrDisablingTable(tableName)) { + LOG.warn("Moving table " + tableName + " state to disabling but was " + + "not first in enabled state: " + this.cache.get(tableName)); + } + try { + setTableState(tableName, ZooKeeperProtos.Table.State.DISABLING); + } catch (KeeperException e) { + throw new ConsensusException(e); + } + } + } + + /** + * Sets the specified table as ENABLING in zookeeper. Fails silently if the + * table is already disabled in zookeeper. Sets no watches. + * @param tableName table we're working on + * @throws ConsensusException if error happened in underlying coordination engine + */ + @Override + public void setEnablingTable(final TableName tableName) + throws ConsensusException { + synchronized (this.cache) { + if (!isDisabledOrEnablingTable(tableName)) { + LOG.warn("Moving table " + tableName + " state to enabling but was " + + "not first in disabled state: " + this.cache.get(tableName)); + } + try { + setTableState(tableName, ZooKeeperProtos.Table.State.ENABLING); + } catch (KeeperException e) { + throw new ConsensusException(e); + } + } + } + + /** + * Sets the specified table as ENABLING in zookeeper atomically + * If the table is already in ENABLING state, no operation is performed + * @param tableName table we're working on + * @return if the operation succeeds or not + * @throws ConsensusException if error happened in underlying coordination engine + */ + @Override + public boolean checkAndSetEnablingTable(final TableName tableName) + throws ConsensusException { + synchronized (this.cache) { + if (isEnablingOrEnabledTable(tableName)) { + return false; + } + try { + setTableState(tableName, ZooKeeperProtos.Table.State.ENABLING); + } catch (KeeperException e) { + throw new ConsensusException(e); + } + return true; + } + } + + /** + * Sets the specified table as ENABLING in zookeeper atomically + * If the table isn't in DISABLED state, no operation is performed + * @param tableName table we're working on + * @return if the operation succeeds or not + * @throws ConsensusException if error happened in underlying coordination engine + */ + @Override + public boolean checkDisabledAndSetEnablingTable(final TableName tableName) + throws ConsensusException { + synchronized (this.cache) { + if (!isDisabledTable(tableName)) { + return false; + } + try { + setTableState(tableName, ZooKeeperProtos.Table.State.ENABLING); + } catch (KeeperException e) { + throw new ConsensusException(e); + } + return true; + } + } + + /** + * Sets the specified table as DISABLING in zookeeper atomically + * If the table isn't in ENABLED state, no operation is performed + * @param tableName table we're working on + * @return if the operation succeeds or not + * @throws ConsensusException if error happened in underlying coordination engine + */ + @Override + public boolean checkEnabledAndSetDisablingTable(final TableName tableName) + throws ConsensusException { + synchronized (this.cache) { + if (this.cache.get(tableName) != null && !isEnabledTable(tableName)) { + return false; + } + try { + setTableState(tableName, ZooKeeperProtos.Table.State.DISABLING); + } catch (KeeperException e) { + throw new ConsensusException(e); + } + return true; + } + } + + private void setTableState(final TableName tableName, final ZooKeeperProtos.Table.State state) + throws KeeperException { + String znode = ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()); + if (ZKUtil.checkExists(this.watcher, znode) == -1) { + ZKUtil.createAndFailSilent(this.watcher, znode); + } + synchronized (this.cache) { + ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder(); + builder.setState(state); + byte [] data = ProtobufUtil.prependPBMagic(builder.build().toByteArray()); + ZKUtil.setData(this.watcher, znode, data); + this.cache.put(tableName, state); + } + } + + @Override + public boolean isDisabledTable(final TableName tableName) { + return isTableState(tableName, ZooKeeperProtos.Table.State.DISABLED); + } + + @Override + public boolean isDisablingTable(final TableName tableName) { + return isTableState(tableName, ZooKeeperProtos.Table.State.DISABLING); + } + + @Override + public boolean isEnablingTable(final TableName tableName) { + return isTableState(tableName, ZooKeeperProtos.Table.State.ENABLING); + } + + @Override + public boolean isEnabledTable(TableName tableName) { + return isTableState(tableName, ZooKeeperProtos.Table.State.ENABLED); + } + + @Override + public boolean isDisablingOrDisabledTable(final TableName tableName) { + synchronized (this.cache) { + return isDisablingTable(tableName) || isDisabledTable(tableName); + } + } + + @Override + public boolean isEnablingOrEnabledTable(final TableName tableName) { + synchronized (this.cache) { + return isEnablingTable(tableName) || isEnabledTable(tableName); + } + } + + @Override + public boolean isEnabledOrDisablingTable(final TableName tableName) { + synchronized (this.cache) { + return isEnabledTable(tableName) || isDisablingTable(tableName); + } + } + + @Override + public boolean isDisabledOrEnablingTable(final TableName tableName) { + synchronized (this.cache) { + return isDisabledTable(tableName) || isEnablingTable(tableName); + } + } + + private boolean isTableState(final TableName tableName, final ZooKeeperProtos.Table.State state) { + synchronized (this.cache) { + ZooKeeperProtos.Table.State currentState = this.cache.get(tableName); + return ZKTableReadOnly.isTableState(currentState, state); + } + } + + /** + * Deletes the table in zookeeper. Fails silently if the + * table is not currently disabled in zookeeper. Sets no watches. + * @param tableName + * @throws KeeperException unexpected zookeeper exception + */ + @Override + public void setDeletedTable(final TableName tableName) + throws ConsensusException { + synchronized (this.cache) { + if (this.cache.remove(tableName) == null) { + LOG.warn("Moving table " + tableName + " state to deleted but was " + + "already deleted"); + } + try { + ZKUtil.deleteNodeFailSilent(this.watcher, + ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString())); + } catch (KeeperException e) { + throw new ConsensusException(e); + } + } + } + + /** + * Sets the ENABLED state in the cache and creates or force updates a node to + * ENABLED state for the specified table + * + * @param tableName table we're working on + * @throws ConsensusException if error happened in underlying coordination engine + */ + @Override + public void setEnabledTable(final TableName tableName) throws ConsensusException { + try { + setTableState(tableName, ZooKeeperProtos.Table.State.ENABLED); + } catch (KeeperException e) { + throw new ConsensusException(e); + } + } + + /** + * check if table is present . + * + * @param tableName table we're working on + * @return true if the table is present + */ + @Override + public boolean isTablePresent(final TableName tableName) { + synchronized (this.cache) { + ZooKeeperProtos.Table.State state = this.cache.get(tableName); + return !(state == null); + } + } + + /** + * Gets a list of all the tables set as disabled in zookeeper. + * @return Set of disabled tables, empty Set if none + */ + @Override + public Set getDisabledTables() { + Set disabledTables = new HashSet(); + synchronized (this.cache) { + Set tables = this.cache.keySet(); + for (TableName table: tables) { + if (isDisabledTable(table)) disabledTables.add(table); + } + } + return disabledTables; + } + + /** + * Gets a list of all the tables set as disabling in zookeeper. + * @return Set of disabling tables, empty Set if none + * @throws ConsensusException if error happened in underlying coordination engine + */ + @Override + public Set getDisablingTables() + throws InterruptedIOException, ConsensusException { + try { + return getAllTables(ZooKeeperProtos.Table.State.DISABLING); + } catch (KeeperException e) { + throw new ConsensusException(e); + } + } + + /** + * Gets a list of all the tables set as enabling in zookeeper. + * @return Set of enabling tables, empty Set if none + * @throws ConsensusException if error happened in underlying coordination engine + */ + @Override + public Set getEnablingTables() + throws InterruptedIOException, ConsensusException { + try { + return getAllTables(ZooKeeperProtos.Table.State.ENABLING); + } catch (KeeperException e) { + throw new ConsensusException(e); + } + } + + /** + * Gets a list of all the tables set as disabled in zookeeper. + * @return Set of disabled tables, empty Set if none + * @throws ConsensusException if error happened in underlying coordination engine + */ + @Override + public Set getDisabledOrDisablingTables() + throws InterruptedIOException, ConsensusException { + try { + return getAllTables(ZooKeeperProtos.Table.State.DISABLED, + ZooKeeperProtos.Table.State.DISABLING); + } catch (KeeperException e) { + throw new ConsensusException(e); + } + } + + /** + * If the table is found in ENABLING state the inmemory state is removed. This + * helps in cases where CreateTable is to be retried by the client incase of + * failures. If deleteZNode is true - the znode is also deleted + * + * @param tableName table we're working on + * @param boolean deletePermanentState + * @throws ConsensusException if error happened in underlying coordination engine + */ + @Override + public void removeEnablingTable(final TableName tableName, boolean deletePermanentState) + throws ConsensusException { + synchronized (this.cache) { + if (isEnablingTable(tableName)) { + this.cache.remove(tableName); + if (deletePermanentState) { + try { + ZKUtil.deleteNodeFailSilent(this.watcher, + ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString())); + } catch (KeeperException e) { + throw new ConsensusException(e); + } + } + } + } + } + + /** + * Gets a list of all the tables of specified states in zookeeper. + * @return Set of tables of specified states, empty Set if none + * @throws KeeperException + */ + Set getAllTables(final ZooKeeperProtos.Table.State... states) + throws KeeperException, InterruptedIOException { + + Set allTables = new HashSet(); + List children = + ZKUtil.listChildrenNoWatch(watcher, watcher.tableZNode); + if(children == null) return allTables; + for (String child: children) { + TableName tableName = TableName.valueOf(child); + ZooKeeperProtos.Table.State state = null; + try { + state = ZKTableReadOnly.getTableState(watcher, tableName); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + for (ZooKeeperProtos.Table.State expectedState: states) { + if (state == expectedState) { + allTables.add(tableName); + break; + } + } + } + return allTables; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index 8172fdc..6962707 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -73,7 +73,6 @@ public class LocalHBaseCluster { private final Class masterClass; private final Class regionServerClass; - ConsensusProvider consensusProvider; /** * Constructor. * @param conf @@ -140,7 +139,6 @@ public class LocalHBaseCluster { final Class regionServerClass) throws IOException { this.conf = conf; - consensusProvider = ConsensusProviderFactory.getConsensusProvider(conf); // Always have masters and regionservers come up on port '0' so we don't // clash over default ports. @@ -175,8 +173,14 @@ public class LocalHBaseCluster { // Create each regionserver with its own Configuration instance so each has // its HConnection instance rather than share (see HBASE_INSTANCES down in // the guts of HConnectionManager. + + // Also, create separate ConsensusProvider instance per Server. + // This is special case when we have to have more than 1 ConsensusProvider + // within 1 process. + ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(conf); + JVMClusterUtil.RegionServerThread rst = - JVMClusterUtil.createRegionServerThread(config, consensusProvider, + JVMClusterUtil.createRegionServerThread(config, cp, this.regionServerClass, index); this.regionThreads.add(rst); return rst; @@ -202,7 +206,13 @@ public class LocalHBaseCluster { // Create each master with its own Configuration instance so each has // its HConnection instance rather than share (see HBASE_INSTANCES down in // the guts of HConnectionManager. - JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, consensusProvider, + + // Also, create separate ConsensusProvider instance per Server. + // This is special case when we have to have more than 1 ConsensusProvider + // within 1 process. + ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(conf); + + JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp, (Class) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index); this.masterThreads.add(mt); return mt; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseConsensusProvider.java hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseConsensusProvider.java index 03f5fca..ca58619 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseConsensusProvider.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseConsensusProvider.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.consensus; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.ConsensusProvider; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableStateManager; /** * Base class for {@link org.apache.hadoop.hbase.ConsensusProvider} implementations. @@ -46,4 +47,9 @@ public abstract class BaseConsensusProvider implements ConsensusProvider { public Server getServer() { return null; } + + @Override + public synchronized TableStateManager getTableStateManager() throws InterruptedException { + return null; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkConsensusProvider.java hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkConsensusProvider.java index 56175ae..ceca07d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkConsensusProvider.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkConsensusProvider.java @@ -17,15 +17,21 @@ */ package org.apache.hadoop.hbase.consensus; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableStateManager; +import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; /** * ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.ConsensusProvider}. */ @InterfaceAudience.Private public class ZkConsensusProvider extends BaseConsensusProvider { + private static final Log LOG = LogFactory.getLog(ZkConsensusProvider.class); private Server server; private ZooKeeperWatcher watcher; @@ -39,4 +45,19 @@ public class ZkConsensusProvider extends BaseConsensusProvider { public Server getServer() { return server; } + + @Override + public TableStateManager getTableStateManager() throws InterruptedException { + try { + return new ZKTableStateManager(server.getZooKeeper()); + } catch (KeeperException e) { + LOG.error("ZK exception when initializing consensus:" + e.getMessage()); + server.abort("ZK exception when initializing consensus:", e); + } catch (InterruptedException e) { + LOG.error("ZK exception when initializing consensus:" + e.getMessage()); + server.abort("ZK exception when initializing consensus:", e); + } + + return null; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 9ae4371..0de492b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -52,9 +52,11 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.TableStateManager; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.exceptions.ConsensusException; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; @@ -82,7 +84,6 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.ipc.RemoteException; @@ -161,7 +162,7 @@ public class AssignmentManager extends ZooKeeperListener { final NavigableMap regionPlans = new TreeMap(); - private final ZKTable zkTable; + private final TableStateManager tableStateManager; private final ExecutorService executorService; @@ -247,7 +248,11 @@ public class AssignmentManager extends ZooKeeperListener { HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals( FavoredNodeLoadBalancer.class); try { - this.zkTable = new ZKTable(this.watcher); + if (server.getConsensusProvider() != null) { + this.tableStateManager = server.getConsensusProvider().getTableStateManager(); + } else { + this.tableStateManager = null; + } } catch (InterruptedException e) { throw new InterruptedIOException(); } @@ -277,12 +282,12 @@ public class AssignmentManager extends ZooKeeperListener { } /** - * @return Instance of ZKTable. + * @return Instance of ZKTableStateManager. */ - public ZKTable getZKTable() { + public TableStateManager getTableStateManager() { // These are 'expensive' to make involving trip to zk ensemble so allow // sharing. - return this.zkTable; + return this.tableStateManager; } /** @@ -400,15 +405,20 @@ public class AssignmentManager extends ZooKeeperListener { // Scan hbase:meta to build list of existing regions, servers, and assignment // Returns servers who have not checked in (assumed dead) and their regions - Map> deadServers = rebuildUserRegions(); - - // This method will assign all user regions if a clean server startup or - // it will reconstruct master state and cleanup any leftovers from - // previous master process. - processDeadServersAndRegionsInTransition(deadServers); + Map> deadServers = null; + try { + deadServers = rebuildUserRegions(); + // This method will assign all user regions if a clean server startup or + // it will reconstruct master state and cleanup any leftovers from + // previous master process. + processDeadServersAndRegionsInTransition(deadServers); - recoverTableInDisablingState(); - recoverTableInEnablingState(); + recoverTableInDisablingState(); + recoverTableInEnablingState(); + } catch (ConsensusException e) { + this.server.abort("Error in the coordination engine during Assignment" + + " manager startup:", e); + } } /** @@ -424,7 +434,7 @@ public class AssignmentManager extends ZooKeeperListener { */ void processDeadServersAndRegionsInTransition( final Map> deadServers) - throws KeeperException, IOException, InterruptedException { + throws KeeperException, IOException, InterruptedException, ConsensusException { List nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode); @@ -1134,7 +1144,8 @@ public class AssignmentManager extends ZooKeeperListener { HRegionInfo regionInfo = rs.getRegion(); String regionNameStr = regionInfo.getRegionNameAsString(); LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs); - boolean disabled = getZKTable().isDisablingOrDisabledTable(regionInfo.getTable()); + boolean disabled = getTableStateManager().isDisablingOrDisabledTable( + regionInfo.getTable()); ServerName serverName = rs.getServerName(); if (serverManager.isServerOnline(serverName)) { if (rs.isOnServer(serverName) @@ -1819,7 +1830,8 @@ public class AssignmentManager extends ZooKeeperListener { // assignRegion then we need to make the table ENABLED. Hence in such case the table // will not be in ENABLING or ENABLED state. TableName tableName = region.getTable(); - if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) { + if (!tableStateManager.isEnablingTable(tableName) && + !tableStateManager.isEnabledTable(tableName)) { LOG.debug("Setting table " + tableName + " to ENABLED state."); setEnabledTable(tableName); } @@ -2002,8 +2014,8 @@ public class AssignmentManager extends ZooKeeperListener { private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) { TableName tableName = region.getTable(); - boolean disabled = this.zkTable.isDisabledTable(tableName); - if (disabled || this.zkTable.isDisablingTable(tableName)) { + boolean disabled = this.tableStateManager.isDisabledTable(tableName); + if (disabled || this.tableStateManager.isDisablingTable(tableName)) { LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") + " skipping assign of " + region.getRegionNameAsString()); offlineDisabledRegion(region); @@ -2455,7 +2467,7 @@ public class AssignmentManager extends ZooKeeperListener { * @throws KeeperException */ private void assignAllUserRegions() - throws IOException, InterruptedException, KeeperException { + throws IOException, InterruptedException, KeeperException, ConsensusException { // Cleanup any existing ZK nodes and start watching ZKAssign.deleteAllNodes(watcher); ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, @@ -2465,8 +2477,10 @@ public class AssignmentManager extends ZooKeeperListener { // Skip assignment for regions of tables in DISABLING state because during clean cluster startup // no RS is alive and regions map also doesn't have any information about the regions. // See HBASE-6281. - Set disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher); - disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher)); + Set disabledOrDisablingOrEnabling = + tableStateManager.getDisabledOrDisablingTables(); + disabledOrDisablingOrEnabling.addAll(tableStateManager. + getEnablingTables()); // Scan hbase:meta for all user regions, skipping any disabled tables Map allRegions; SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment = @@ -2488,7 +2502,7 @@ public class AssignmentManager extends ZooKeeperListener { for (HRegionInfo hri : allRegions.keySet()) { TableName tableName = hri.getTable(); - if (!zkTable.isEnabledTable(tableName)) { + if (!tableStateManager.isEnabledTable(tableName)) { setEnabledTable(tableName); } } @@ -2527,11 +2541,15 @@ public class AssignmentManager extends ZooKeeperListener { * in META * @throws IOException */ - Map> rebuildUserRegions() throws IOException, KeeperException { - Set enablingTables = ZKTable.getEnablingTables(watcher); - Set disabledOrEnablingTables = ZKTable.getDisabledTables(watcher); + Map> rebuildUserRegions() throws + IOException, KeeperException, ConsensusException { + Set enablingTables = + tableStateManager.getEnablingTables(); + Set disabledOrEnablingTables = + tableStateManager.getDisabledTables(); disabledOrEnablingTables.addAll(enablingTables); - Set disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher); + Set disabledOrDisablingOrEnabling = + tableStateManager.getDisablingTables(); disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables); // Region assignment from META @@ -2584,7 +2602,7 @@ public class AssignmentManager extends ZooKeeperListener { // need to enable the table if not disabled or disabling or enabling // this will be used in rolling restarts if (!disabledOrDisablingOrEnabling.contains(tableName) - && !getZKTable().isEnabledTable(tableName)) { + && !getTableStateManager().isEnabledTable(tableName)) { setEnabledTable(tableName); } } else { @@ -2598,7 +2616,7 @@ public class AssignmentManager extends ZooKeeperListener { // need to enable the table if not disabled or disabling or enabling // this will be used in rolling restarts if (!disabledOrDisablingOrEnabling.contains(tableName) - && !getZKTable().isEnabledTable(tableName)) { + && !getTableStateManager().isEnabledTable(tableName)) { setEnabledTable(tableName); } } @@ -2615,8 +2633,9 @@ public class AssignmentManager extends ZooKeeperListener { * @throws IOException */ private void recoverTableInDisablingState() - throws KeeperException, TableNotFoundException, IOException { - Set disablingTables = ZKTable.getDisablingTables(watcher); + throws KeeperException, TableNotFoundException, IOException, ConsensusException { + Set disablingTables = + tableStateManager.getDisablingTables(); if (disablingTables.size() != 0) { for (TableName tableName : disablingTables) { // Recover by calling DisableTableHandler @@ -2638,8 +2657,9 @@ public class AssignmentManager extends ZooKeeperListener { * @throws IOException */ private void recoverTableInEnablingState() - throws KeeperException, TableNotFoundException, IOException { - Set enablingTables = ZKTable.getEnablingTables(watcher); + throws KeeperException, TableNotFoundException, IOException, ConsensusException { + Set enablingTables = tableStateManager. + getEnablingTables(); if (enablingTables.size() != 0) { for (TableName tableName : enablingTables) { // Recover by calling EnableTableHandler @@ -2876,7 +2896,7 @@ public class AssignmentManager extends ZooKeeperListener { } catch (KeeperException ke) { server.abort("Unexpected ZK exception deleting node " + hri, ke); } - if (zkTable.isDisablingOrDisabledTable(hri.getTable())) { + if (tableStateManager.isDisablingOrDisabledTable(hri.getTable())) { regionStates.regionOffline(hri); it.remove(); continue; @@ -2897,7 +2917,7 @@ public class AssignmentManager extends ZooKeeperListener { public void balance(final RegionPlan plan) { HRegionInfo hri = plan.getRegionInfo(); TableName tableName = hri.getTable(); - if (zkTable.isDisablingOrDisabledTable(tableName)) { + if (tableStateManager.isDisablingOrDisabledTable(tableName)) { LOG.info("Ignored moving region of disabling/disabled table " + tableName); return; @@ -2940,8 +2960,8 @@ public class AssignmentManager extends ZooKeeperListener { protected void setEnabledTable(TableName tableName) { try { - this.zkTable.setEnabledTable(tableName); - } catch (KeeperException e) { + this.tableStateManager.setEnabledTable(tableName); + } catch (ConsensusException e) { // here we can abort as it is the start up flow String errorMsg = "Unable to ensure that the table " + tableName + " will be" + " enabled because of a ZooKeeper issue"; @@ -3113,7 +3133,7 @@ public class AssignmentManager extends ZooKeeperListener { + hri_b.getRegionNameAsString() + ", on " + sn); // User could disable the table before master knows the new region. - if (zkTable.isDisablingOrDisabledTable(p.getTable())) { + if (tableStateManager.isDisablingOrDisabledTable(p.getTable())) { unassign(p); } } @@ -3239,7 +3259,7 @@ public class AssignmentManager extends ZooKeeperListener { + hri_b.getRegionNameAsString() + ", on " + sn); // User could disable the table before master knows the new region. - if (zkTable.isDisablingOrDisabledTable(p.getTable())) { + if (tableStateManager.isDisablingOrDisabledTable(p.getTable())) { unassign(hri_a); unassign(hri_b); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 01293f0..89f686a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -765,7 +765,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } private void enableMeta(TableName metaTableName) { - if (!this.assignmentManager.getZKTable().isEnabledTable(metaTableName)) { + if (!this.assignmentManager.getTableStateManager().isEnabledTable(metaTableName)) { this.assignmentManager.setEnabledTable(metaTableName); } } @@ -1477,7 +1477,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { if (!MetaReader.tableExists(getCatalogTracker(), tableName)) { throw new TableNotFoundException(tableName); } - if (!getAssignmentManager().getZKTable(). + if (!getAssignmentManager().getTableStateManager(). isDisabledTable(tableName)) { throw new TableNotDisabledException(tableName); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java index 906cfd5..90c3b0c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java @@ -91,7 +91,7 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf public void process() { LOG.debug("Handling CLOSED event for " + regionInfo.getEncodedName()); // Check if this table is being disabled or not - if (this.assignmentManager.getZKTable(). + if (this.assignmentManager.getTableStateManager(). isDisablingOrDisabledTable(this.regionInfo.getTable())) { assignmentManager.offlineDisabledRegion(regionInfo); return; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java index 75cbde5..3d8eb6d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.exceptions.ConsensusException; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; @@ -49,7 +50,6 @@ import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.ModifyRegionUtils; -import org.apache.zookeeper.KeeperException; /** * Handler to create a table. @@ -130,10 +130,10 @@ public class CreateTableHandler extends EventHandler { // We could have cleared the hbase.rootdir and not zk. How can we detect this case? // Having to clean zk AND hdfs is awkward. try { - if (!assignmentManager.getZKTable().checkAndSetEnablingTable(tableName)) { + if (!assignmentManager.getTableStateManager().checkAndSetEnablingTable(tableName)) { throw new TableExistsException(tableName); } - } catch (KeeperException e) { + } catch (ConsensusException e) { throw new IOException("Unable to ensure that the table will be" + " enabling because of a ZooKeeper issue", e); } @@ -146,8 +146,8 @@ public class CreateTableHandler extends EventHandler { // again with the same Active master // It will block the creation saying TableAlreadyExists. try { - assignmentManager.getZKTable().removeEnablingTable(tableName, false); - } catch (KeeperException e) { + assignmentManager.getTableStateManager().removeEnablingTable(tableName, false); + } catch (ConsensusException e) { // Keeper exception should not happen here LOG.error("Got a keeper exception while removing the ENABLING table znode " + tableName, e); @@ -211,7 +211,7 @@ public class CreateTableHandler extends EventHandler { * - Update ZooKeeper with the enabled state */ private void handleCreateTable(TableName tableName) - throws IOException, KeeperException { + throws IOException, ConsensusException { Path tempdir = fileSystemManager.getTempDir(); FileSystem fs = fileSystemManager.getFileSystem(); @@ -239,8 +239,8 @@ public class CreateTableHandler extends EventHandler { // 6. Set table enabled flag up in zk. try { - assignmentManager.getZKTable().setEnabledTable(tableName); - } catch (KeeperException e) { + assignmentManager.getTableStateManager().setEnabledTable(tableName); + } catch (ConsensusException e) { throw new IOException("Unable to ensure that " + tableName + " will be" + " enabled because of a ZooKeeper issue", e); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java index 99b98ac..14ce790 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.catalog.MetaEditor; +import org.apache.hadoop.hbase.exceptions.ConsensusException; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.master.AssignmentManager; @@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.RegionState.State; -import org.apache.zookeeper.KeeperException; @InterfaceAudience.Private public class DeleteTableHandler extends TableEventHandler { @@ -62,7 +62,7 @@ public class DeleteTableHandler extends TableEventHandler { } protected void waitRegionInTransition(final List regions) - throws IOException, KeeperException { + throws IOException, ConsensusException { AssignmentManager am = this.masterServices.getAssignmentManager(); RegionStates states = am.getRegionStates(); long waitTime = server.getConfiguration(). @@ -93,7 +93,7 @@ public class DeleteTableHandler extends TableEventHandler { @Override protected void handleTableOperation(List regions) - throws IOException, KeeperException { + throws IOException, ConsensusException { MasterCoprocessorHost cpHost = ((HMaster) this.server).getMasterCoprocessorHost(); if (cpHost != null) { cpHost.preDeleteTableHandler(this.tableName); @@ -118,7 +118,7 @@ public class DeleteTableHandler extends TableEventHandler { // 5. If entry for this table in zk, and up in AssignmentManager, remove it. LOG.debug("Marking '" + tableName + "' as deleted."); - am.getZKTable().setDeletedTable(tableName); + am.getTableStateManager().setDeletedTable(tableName); } if (cpHost != null) { @@ -130,7 +130,7 @@ public class DeleteTableHandler extends TableEventHandler { * Removes the table from .META. and archives the HDFS files. */ protected void removeTableData(final List regions) - throws IOException, KeeperException { + throws IOException, ConsensusException { // 1. Remove regions from META LOG.debug("Deleting regions from META"); MetaEditor.deleteRegions(this.server.getCatalogTracker(), regions); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java index 8ee2dc0..8aa2eba 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.constraint.ConstraintException; +import org.apache.hadoop.hbase.exceptions.ConsensusException; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; @@ -43,7 +44,6 @@ import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.zookeeper.KeeperException; import org.htrace.Trace; /** @@ -94,14 +94,14 @@ public class DisableTableHandler extends EventHandler { //TODO: reevaluate this since we have table locks now if (!skipTableStateCheck) { try { - if (!this.assignmentManager.getZKTable().checkEnabledAndSetDisablingTable + if (!this.assignmentManager.getTableStateManager().checkEnabledAndSetDisablingTable (this.tableName)) { LOG.info("Table " + tableName + " isn't enabled; skipping disable"); throw new TableNotEnabledException(this.tableName); } - } catch (KeeperException e) { + } catch (ConsensusException e) { throw new IOException("Unable to ensure that the table will be" + - " disabling because of a ZooKeeper issue", e); + " disabling because of a coordination engine issue", e); } } success = true; @@ -139,7 +139,7 @@ public class DisableTableHandler extends EventHandler { } } catch (IOException e) { LOG.error("Error trying to disable table " + this.tableName, e); - } catch (KeeperException e) { + } catch (ConsensusException e) { LOG.error("Error trying to disable table " + this.tableName, e); } finally { releaseTableLock(); @@ -156,9 +156,9 @@ public class DisableTableHandler extends EventHandler { } } - private void handleDisableTable() throws IOException, KeeperException { + private void handleDisableTable() throws IOException, ConsensusException { // Set table disabling flag up in zk. - this.assignmentManager.getZKTable().setDisablingTable(this.tableName); + this.assignmentManager.getTableStateManager().setDisablingTable(this.tableName); boolean done = false; while (true) { // Get list of online regions that are of this table. Regions that are @@ -186,7 +186,7 @@ public class DisableTableHandler extends EventHandler { } } // Flip the table to disabled if success. - if (done) this.assignmentManager.getZKTable().setDisabledTable(this.tableName); + if (done) this.assignmentManager.getTableStateManager().setDisabledTable(this.tableName); LOG.info("Disabled table is done=" + done); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java index 11a5606..0f313c5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.exceptions.ConsensusException; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; @@ -46,7 +47,6 @@ import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.Pair; -import org.apache.zookeeper.KeeperException; /** * Handler to run enable of a table. @@ -88,9 +88,9 @@ public class EnableTableHandler extends EventHandler { throw new TableNotFoundException(tableName); } try { - this.assignmentManager.getZKTable().removeEnablingTable(tableName, true); + this.assignmentManager.getTableStateManager().removeEnablingTable(tableName, true); throw new TableNotFoundException(tableName); - } catch (KeeperException e) { + } catch (ConsensusException e) { // TODO : Use HBCK to clear such nodes LOG.warn("Failed to delete the ENABLING node for the table " + tableName + ". The table will remain unusable. Run HBCK to manually fix the problem."); @@ -103,14 +103,14 @@ public class EnableTableHandler extends EventHandler { // DISABLED or ENABLED. if (!skipTableStateCheck) { try { - if (!this.assignmentManager.getZKTable().checkDisabledAndSetEnablingTable + if (!this.assignmentManager.getTableStateManager().checkDisabledAndSetEnablingTable (this.tableName)) { LOG.info("Table " + tableName + " isn't disabled; skipping enable"); throw new TableNotDisabledException(this.tableName); } - } catch (KeeperException e) { + } catch (ConsensusException e) { throw new IOException("Unable to ensure that the table will be" + - " enabling because of a ZooKeeper issue", e); + " enabling because of a coordination engine issue", e); } } success = true; @@ -147,7 +147,7 @@ public class EnableTableHandler extends EventHandler { } } catch (IOException e) { LOG.error("Error trying to enable the table " + this.tableName, e); - } catch (KeeperException e) { + } catch (ConsensusException e) { LOG.error("Error trying to enable the table " + this.tableName, e); } catch (InterruptedException e) { LOG.error("Error trying to enable the table " + this.tableName, e); @@ -166,12 +166,12 @@ public class EnableTableHandler extends EventHandler { } } - private void handleEnableTable() throws IOException, KeeperException, InterruptedException { + private void handleEnableTable() throws IOException, ConsensusException, InterruptedException { // I could check table is disabling and if so, not enable but require // that user first finish disabling but that might be obnoxious. // Set table enabling flag up in zk. - this.assignmentManager.getZKTable().setEnablingTable(this.tableName); + this.assignmentManager.getTableStateManager().setEnablingTable(this.tableName); boolean done = false; ServerManager serverManager = ((HMaster)this.server).getServerManager(); // Get the regions of this table. We're done when all listed @@ -206,7 +206,7 @@ public class EnableTableHandler extends EventHandler { } if (done) { // Flip the table to enabled. - this.assignmentManager.getZKTable().setEnabledTable( + this.assignmentManager.getTableStateManager().setEnabledTable( this.tableName); LOG.info("Table '" + this.tableName + "' was successfully enabled. Status: done=" + done); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java index f711bd1..d8ff09e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java @@ -113,7 +113,8 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf " because regions is NOT in RIT -- presuming this is because it SPLIT"); } if (!openedNodeDeleted) { - if (this.assignmentManager.getZKTable().isDisablingOrDisabledTable(regionInfo.getTable())) { + if (this.assignmentManager.getTableStateManager(). + isDisablingOrDisabledTable(regionInfo.getTable())) { debugLog(regionInfo, "Opened region " + regionInfo.getShortNameToLog() + " but " + "this table is disabled, triggering close of region"); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index 056d5d3..0e0246d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -257,7 +257,7 @@ public class ServerShutdownHandler extends EventHandler { toAssignRegions.add(hri); } else if (rit != null) { if (rit.isPendingCloseOrClosing() - && am.getZKTable().isDisablingOrDisabledTable(hri.getTable())) { + && am.getTableStateManager().isDisablingOrDisabledTable(hri.getTable())) { // If the table was partially disabled and the RS went down, we should clear the RIT // and remove the node for the region. // The rit that we use may be stale in case the table was in DISABLING state @@ -334,14 +334,14 @@ public class ServerShutdownHandler extends EventHandler { public static boolean processDeadRegion(HRegionInfo hri, Result result, AssignmentManager assignmentManager, CatalogTracker catalogTracker) throws IOException { - boolean tablePresent = assignmentManager.getZKTable().isTablePresent(hri.getTable()); + boolean tablePresent = assignmentManager.getTableStateManager().isTablePresent(hri.getTable()); if (!tablePresent) { LOG.info("The table " + hri.getTable() + " was deleted. Hence not proceeding."); return false; } // If table is not disabled but the region is offlined, - boolean disabled = assignmentManager.getZKTable().isDisabledTable(hri.getTable()); + boolean disabled = assignmentManager.getTableStateManager().isDisabledTable(hri.getTable()); if (disabled){ LOG.info("The table " + hri.getTable() + " was disabled. Hence not proceeding."); @@ -353,7 +353,7 @@ public class ServerShutdownHandler extends EventHandler { //to the dead server. We don't have to do anything. return false; } - boolean disabling = assignmentManager.getZKTable().isDisablingTable(hri.getTable()); + boolean disabling = assignmentManager.getTableStateManager().isDisablingTable(hri.getTable()); if (disabling) { LOG.info("The table " + hri.getTable() + " is disabled. Hence not assigning region" + hri.getEncodedName()); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java index 6e3176c..a390c8c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java @@ -39,13 +39,13 @@ import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.exceptions.ConsensusException; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.BulkReOpen; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.zookeeper.KeeperException; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -128,7 +128,7 @@ public abstract class TableEventHandler extends EventHandler { tableName); handleTableOperation(hris); if (eventType.isOnlineSchemaChangeSupported() && this.masterServices. - getAssignmentManager().getZKTable(). + getAssignmentManager().getTableStateManager(). isEnabledTable(tableName)) { if (reOpenAllRegions(hris)) { LOG.info("Completed table operation " + eventType + " on table " + @@ -141,7 +141,7 @@ public abstract class TableEventHandler extends EventHandler { } catch (IOException e) { LOG.error("Error manipulating table " + tableName, e); completed(e); - } catch (KeeperException e) { + } catch (ConsensusException e) { LOG.error("Error manipulating table " + tableName, e); completed(e); } finally { @@ -249,5 +249,5 @@ public abstract class TableEventHandler extends EventHandler { } protected abstract void handleTableOperation(List regions) - throws IOException, KeeperException; + throws IOException, ConsensusException; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TruncateTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TruncateTableHandler.java index 7cc30b4..80553f2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TruncateTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TruncateTableHandler.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.catalog.MetaEditor; +import org.apache.hadoop.hbase.exceptions.ConsensusException; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.master.AssignmentManager; @@ -67,7 +68,7 @@ public class TruncateTableHandler extends DeleteTableHandler { @Override protected void handleTableOperation(List regions) - throws IOException, KeeperException { + throws IOException, ConsensusException { MasterCoprocessorHost cpHost = ((HMaster) this.server).getMasterCoprocessorHost(); if (cpHost != null) { cpHost.preTruncateTableHandler(this.tableName); @@ -137,8 +138,8 @@ public class TruncateTableHandler extends DeleteTableHandler { // 6. Set table enabled flag up in zk. try { - assignmentManager.getZKTable().setEnabledTable(tableName); - } catch (KeeperException e) { + assignmentManager.getTableStateManager().setEnabledTable(tableName); + } catch (ConsensusException e) { throw new IOException("Unable to ensure that " + tableName + " will be" + " enabled because of a ZooKeeper issue", e); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 27cd5c7..6245e89 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -556,13 +556,13 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable // if the table is enabled, then have the RS run actually the snapshot work TableName snapshotTable = TableName.valueOf(snapshot.getTable()); AssignmentManager assignmentMgr = master.getAssignmentManager(); - if (assignmentMgr.getZKTable().isEnabledTable(snapshotTable)) { + if (assignmentMgr.getTableStateManager().isEnabledTable(snapshotTable)) { LOG.debug("Table enabled, starting distributed snapshot."); snapshotEnabledTable(snapshot); LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot)); } // For disabled table, snapshot is created by the master - else if (assignmentMgr.getZKTable().isDisabledTable(snapshotTable)) { + else if (assignmentMgr.getTableStateManager().isDisabledTable(snapshotTable)) { LOG.debug("Table is disabled, running snapshot entirely on master."); snapshotDisabledTable(snapshot); LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot)); @@ -692,7 +692,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable // Execute the restore/clone operation if (MetaReader.tableExists(master.getCatalogTracker(), tableName)) { - if (master.getAssignmentManager().getZKTable().isEnabledTable( + if (master.getAssignmentManager().getTableStateManager().isEnabledTable( TableName.valueOf(fsSnapshot.getTable()))) { throw new UnsupportedOperationException("Table '" + TableName.valueOf(fsSnapshot.getTable()) + "' must be disabled in order to " + diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index e01bb00..6b9fe70 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -123,7 +123,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf, - RegionServerServices server, final LastSequenceId sequenceIdChecker) { + final RegionServerServices server, final LastSequenceId sequenceIdChecker) { this(watcher, conf, server, new TaskExecutor() { @Override public Status exec(String filename, CancelableProgressable p) { @@ -141,7 +141,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { // encountered a bad non-retry-able persistent error. try { if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)), - fs, conf, p, sequenceIdChecker, watcher)) { + fs, conf, p, sequenceIdChecker, watcher, server.getConsensusProvider())) { return Status.PREEMPTED; } } catch (InterruptedIOException iioe) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index fdf71cc..1301a37 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -56,6 +56,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ConsensusProvider; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -66,6 +67,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.TableStateManager; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.client.ConnectionUtils; @@ -74,6 +76,7 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.exceptions.ConsensusException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -104,7 +107,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZKTable; +import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.MultipleIOException; import org.apache.zookeeper.KeeperException; @@ -135,6 +138,7 @@ public class HLogSplitter { private Set disablingOrDisabledTables = new HashSet(); private ZooKeeperWatcher watcher; + private ConsensusProvider consensusProvider; // If an exception is thrown by one of the other threads, it will be // stored here. @@ -168,7 +172,8 @@ public class HLogSplitter { private final int minBatchSize; HLogSplitter(Configuration conf, Path rootDir, - FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) { + FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw, + ConsensusProvider consensusProvider) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); @@ -177,6 +182,7 @@ public class HLogSplitter { this.fs = fs; this.sequenceIdChecker = idChecker; this.watcher = zkw; + this.consensusProvider = consensusProvider; entryBuffers = new EntryBuffers( this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", @@ -188,7 +194,7 @@ public class HLogSplitter { this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf); this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); - if (zkw != null && this.distributedLogReplay) { + if (zkw != null && consensusProvider != null && this.distributedLogReplay) { outputSink = new LogReplayOutputSink(numWriterThreads); } else { if (this.distributedLogReplay) { @@ -219,8 +225,9 @@ public class HLogSplitter { */ public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, - ZooKeeperWatcher zkw) throws IOException { - HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw); + ZooKeeperWatcher zkw, ConsensusProvider cp) throws IOException { + HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, + cp); return s.splitLogFile(logfile, reporter); } @@ -234,7 +241,7 @@ public class HLogSplitter { List splits = new ArrayList(); if (logfiles != null && logfiles.length > 0) { for (FileStatus logfile: logfiles) { - HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null); + HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { @@ -288,10 +295,11 @@ public class HLogSplitter { LOG.warn("Nothing to split in log file " + logPath); return true; } - if(watcher != null) { + if(watcher != null && consensusProvider != null) { try { - disablingOrDisabledTables = ZKTable.getDisabledOrDisablingTables(watcher); - } catch (KeeperException e) { + TableStateManager tsm = consensusProvider.getTableStateManager(); + disablingOrDisabledTables = tsm.getDisabledOrDisablingTables(); + } catch (ConsensusException e) { throw new IOException("Can't get disabling/disabled tables", e); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java index 85b833f..a3341e8 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.consensus.ZkConsensusProvider; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorType; @@ -108,6 +109,12 @@ public class TestDrainingServer { Mockito.when(server.getServerName()).thenReturn(ServerName.valueOf("masterMock,1,1")); Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher); + ConsensusProvider cp = new ZkConsensusProvider(); + cp.initialize(server); + cp.start(); + + Mockito.when(server.getConsensusProvider()).thenReturn(cp); + Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers); Mockito.when(serverManager.getOnlineServersList()) .thenReturn(new ArrayList(onlineServers.keySet())); @@ -204,6 +211,12 @@ public class TestDrainingServer { Mockito.when(server.getServerName()).thenReturn(ServerName.valueOf("masterMock,1,1")); Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher); + ConsensusProvider cp = new ZkConsensusProvider(); + cp.initialize(server); + cp.start(); + + Mockito.when(server.getConsensusProvider()).thenReturn(cp); + Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers); Mockito.when(serverManager.getOnlineServersList()).thenReturn( new ArrayList(onlineServers.keySet())); @@ -291,4 +304,4 @@ public class TestDrainingServer { executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3); return executor; } -} \ No newline at end of file +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java index 998cabb..a59fdf9 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java @@ -255,7 +255,7 @@ public class TestAdmin { this.admin.disableTable(ht.getName()); assertTrue("Table must be disabled.", TEST_UTIL.getHBaseCluster() - .getMaster().getAssignmentManager().getZKTable().isDisabledTable( + .getMaster().getAssignmentManager().getTableStateManager().isDisabledTable( ht.getName())); // Test that table is disabled @@ -270,7 +270,7 @@ public class TestAdmin { assertTrue(ok); this.admin.enableTable(table); assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster() - .getMaster().getAssignmentManager().getZKTable().isEnabledTable( + .getMaster().getAssignmentManager().getTableStateManager().isEnabledTable( ht.getName())); // Test that table is enabled @@ -343,7 +343,7 @@ public class TestAdmin { tables = this.admin.listTables(); assertEquals(numTables + 1, tables.length); assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster() - .getMaster().getAssignmentManager().getZKTable().isEnabledTable( + .getMaster().getAssignmentManager().getTableStateManager().isEnabledTable( TableName.valueOf("testCreateTable"))); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index 20fbc0f..9451915 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -52,6 +52,8 @@ import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.ConsensusProvider; import org.apache.hadoop.hbase.ConsensusProviderFactory; +import org.apache.hadoop.hbase.consensus.ZkConsensusProvider; +import org.apache.hadoop.hbase.exceptions.ConsensusException; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; @@ -116,6 +118,7 @@ public class TestAssignmentManager { private Server server; private ServerManager serverManager; private ZooKeeperWatcher watcher; + private ConsensusProvider cp; private LoadBalancer balancer; private HMaster master; @@ -146,6 +149,12 @@ public class TestAssignmentManager { Mockito.doThrow(new RuntimeException("Aborted")). when(server).abort(Mockito.anyString(), (Throwable)Mockito.anyObject()); + cp = new ZkConsensusProvider(); + cp.initialize(this.server); + cp.start(); + + Mockito.when(server.getConsensusProvider()).thenReturn(cp); + // Mock a ServerManager. Say server SERVERNAME_{A,B} are online. Also // make it so if close or open, we return 'success'. this.serverManager = Mockito.mock(ServerManager.class); @@ -184,6 +193,7 @@ public class TestAssignmentManager { // Clean up all znodes ZKAssign.deleteAllNodes(this.watcher); this.watcher.close(); + this.cp.stop(); } } @@ -466,7 +476,7 @@ public class TestAssignmentManager { */ @Test public void testSSHWhenDisableTableInProgress() throws KeeperException, IOException, - ServiceException { + ConsensusException, ServiceException { testCaseWithPartiallyDisabledState(Table.State.DISABLING); testCaseWithPartiallyDisabledState(Table.State.DISABLED); } @@ -488,7 +498,8 @@ public class TestAssignmentManager { } private void testCaseWithSplitRegionPartial(boolean regionSplitDone) throws KeeperException, - IOException, NodeExistsException, InterruptedException, ServiceException { + IOException, NodeExistsException, InterruptedException, + ConsensusException, ServiceException { // Create and startup an executor. This is used by AssignmentManager // handling zk callbacks. ExecutorService executor = startupMasterExecutor("testSSHWhenSplitRegionInProgress"); @@ -504,7 +515,7 @@ public class TestAssignmentManager { // adding region in pending close. am.getRegionStates().updateRegionState( REGIONINFO, State.SPLITTING, SERVERNAME_A); - am.getZKTable().setEnabledTable(REGIONINFO.getTable()); + am.getTableStateManager().setEnabledTable(REGIONINFO.getTable()); RegionTransition data = RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_SPLITTING, REGIONINFO.getRegionName(), SERVERNAME_A); String node = ZKAssign.getNodeName(this.watcher, REGIONINFO.getEncodedName()); @@ -536,7 +547,7 @@ public class TestAssignmentManager { } private void testCaseWithPartiallyDisabledState(Table.State state) throws KeeperException, - IOException, NodeExistsException, ServiceException { + IOException, NodeExistsException, ConsensusException, ServiceException { // Create and startup an executor. This is used by AssignmentManager // handling zk callbacks. ExecutorService executor = startupMasterExecutor("testSSHWhenDisableTableInProgress"); @@ -553,9 +564,9 @@ public class TestAssignmentManager { // adding region in pending close. am.getRegionStates().updateRegionState(REGIONINFO, State.PENDING_CLOSE); if (state == Table.State.DISABLING) { - am.getZKTable().setDisablingTable(REGIONINFO.getTable()); + am.getTableStateManager().setDisablingTable(REGIONINFO.getTable()); } else { - am.getZKTable().setDisabledTable(REGIONINFO.getTable()); + am.getTableStateManager().setDisabledTable(REGIONINFO.getTable()); } RegionTransition data = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_CLOSING, REGIONINFO.getRegionName(), SERVERNAME_A); @@ -713,7 +724,8 @@ public class TestAssignmentManager { */ @Test(timeout = 60000) public void testProcessDeadServersAndRegionsInTransitionShouldNotFailWithNPE() - throws IOException, KeeperException, InterruptedException, ServiceException { + throws IOException, KeeperException, ConsensusException, + InterruptedException, ServiceException { final RecoverableZooKeeper recoverableZk = Mockito .mock(RecoverableZooKeeper.class); AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager( @@ -842,7 +854,7 @@ public class TestAssignmentManager { */ @Test(timeout = 60000) public void testRegionInOpeningStateOnDeadRSWhileMasterFailover() throws IOException, - KeeperException, ServiceException, InterruptedException { + KeeperException, ServiceException, ConsensusException, InterruptedException { AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager( this.server, this.serverManager); ZKAssign.createNodeOffline(this.watcher, REGIONINFO, SERVERNAME_A); @@ -858,7 +870,7 @@ public class TestAssignmentManager { am.gate.set(false); CatalogTracker ct = Mockito.mock(CatalogTracker.class); assertFalse(am.processRegionsInTransition(rt, REGIONINFO, version)); - am.getZKTable().setEnabledTable(REGIONINFO.getTable()); + am.getTableStateManager().setEnabledTable(REGIONINFO.getTable()); processServerShutdownHandler(ct, am, false); // Waiting for the assignment to get completed. while (!am.gate.get()) { @@ -899,7 +911,7 @@ public class TestAssignmentManager { } try{ // set table in disabling state. - am.getZKTable().setDisablingTable(REGIONINFO.getTable()); + am.getTableStateManager().setDisablingTable(REGIONINFO.getTable()); am.joinCluster(); // should not call retainAssignment if we get empty regions in assignAllUserRegions. assertFalse( @@ -907,12 +919,12 @@ public class TestAssignmentManager { gate.get()); // need to change table state from disabling to disabled. assertTrue("Table should be disabled.", - am.getZKTable().isDisabledTable(REGIONINFO.getTable())); + am.getTableStateManager().isDisabledTable(REGIONINFO.getTable())); } finally { this.server.getConfiguration().setClass( HConstants.HBASE_MASTER_LOADBALANCER_CLASS, SimpleLoadBalancer.class, LoadBalancer.class); - am.getZKTable().setEnabledTable(REGIONINFO.getTable()); + am.getTableStateManager().setEnabledTable(REGIONINFO.getTable()); am.shutdown(); } } @@ -939,17 +951,17 @@ public class TestAssignmentManager { this.serverManager); try { // set table in enabling state. - am.getZKTable().setEnablingTable(REGIONINFO.getTable()); + am.getTableStateManager().setEnablingTable(REGIONINFO.getTable()); new EnableTableHandler(server, REGIONINFO.getTable(), am.getCatalogTracker(), am, new NullTableLockManager(), true).prepare() .process(); assertEquals("Number of assignments should be 1.", 1, assignmentCount); assertTrue("Table should be enabled.", - am.getZKTable().isEnabledTable(REGIONINFO.getTable())); + am.getTableStateManager().isEnabledTable(REGIONINFO.getTable())); } finally { enabling = false; assignmentCount = 0; - am.getZKTable().setEnabledTable(REGIONINFO.getTable()); + am.getTableStateManager().setEnabledTable(REGIONINFO.getTable()); am.shutdown(); ZKAssign.deleteAllNodes(this.watcher); } @@ -978,10 +990,10 @@ public class TestAssignmentManager { try { TableName tableName = TableName.valueOf("dummyTable"); // set table in enabling state. - am.getZKTable().setEnablingTable(tableName); + am.getTableStateManager().setEnablingTable(tableName); am.joinCluster(); assertFalse("Table should not be present in zookeeper.", - am.getZKTable().isTablePresent(tableName)); + am.getTableStateManager().isTablePresent(tableName)); } finally { } } @@ -992,7 +1004,7 @@ public class TestAssignmentManager { */ @Test public void testSSHTimesOutOpeningRegionTransition() - throws KeeperException, IOException, ServiceException { + throws KeeperException, IOException, ConsensusException, ServiceException { // We need a mocked catalog tracker. CatalogTracker ct = Mockito.mock(CatalogTracker.class); // Create an AM. @@ -1006,7 +1018,7 @@ public class TestAssignmentManager { // adding region plan am.regionPlans.put(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, SERVERNAME_B, SERVERNAME_A)); - am.getZKTable().setEnabledTable(REGIONINFO.getTable()); + am.getTableStateManager().setEnabledTable(REGIONINFO.getTable()); try { am.assignInvoked = false; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index 0344cef..d211981 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -567,7 +567,7 @@ public class TestAssignmentManagerOnCluster { Thread.sleep(100); } - am.getZKTable().setDisablingTable(table); + am.getTableStateManager().setDisablingTable(table); List toAssignRegions = am.processServerShutdown(destServerName); assertTrue("Regions to be assigned should be empty.", toAssignRegions.isEmpty()); assertTrue("Regions to be assigned should be empty.", am.getRegionStates() @@ -576,7 +576,7 @@ public class TestAssignmentManagerOnCluster { if (hri != null && serverName != null) { am.regionOnline(hri, serverName); } - am.getZKTable().setDisabledTable(table); + am.getTableStateManager().setDisabledTable(table); TEST_UTIL.deleteTable(table); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java index 5216366..9bdb36d 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java @@ -67,7 +67,7 @@ public class TestMaster { HMaster m = cluster.getMaster(); HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME); - assertTrue(m.assignmentManager.getZKTable().isEnabledTable(TABLENAME)); + assertTrue(m.assignmentManager.getTableStateManager().isEnabledTable(TABLENAME)); TEST_UTIL.loadTable(ht, FAMILYNAME, false); ht.close(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java index af33ea7..6f08294 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableStateManager; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -63,7 +64,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKTable; +import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.data.Stat; import org.junit.Test; @@ -305,7 +306,7 @@ public class TestMasterFailover { log("Beginning to mock scenarios"); // Disable the disabledTable in ZK - ZKTable zktable = new ZKTable(zkw); + TableStateManager zktable = new ZKTableStateManager(zkw); zktable.setDisabledTable(disabledTable); /* @@ -620,7 +621,7 @@ public class TestMasterFailover { log("Assignment completed"); assertTrue(" Table must be enabled.", master.getAssignmentManager() - .getZKTable().isEnabledTable(TableName.valueOf("enabledTable"))); + .getTableStateManager().isEnabledTable(TableName.valueOf("enabledTable"))); // we also need regions assigned out on the dead server List enabledAndOnDeadRegions = new ArrayList(); enabledAndOnDeadRegions.addAll(enabledRegions.subList(0, 6)); @@ -694,7 +695,7 @@ public class TestMasterFailover { log("Beginning to mock scenarios"); // Disable the disabledTable in ZK - ZKTable zktable = new ZKTable(zkw); + TableStateManager zktable = new ZKTableStateManager(zkw); zktable.setDisabledTable(disabledTable); assertTrue(" The enabled table should be identified on master fail over.", diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterRestartAfterDisablingTable.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterRestartAfterDisablingTable.java index c7ddddb..3b46807 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterRestartAfterDisablingTable.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterRestartAfterDisablingTable.java @@ -96,7 +96,7 @@ public class TestMasterRestartAfterDisablingTable { cluster.waitForActiveAndReadyMaster(); assertTrue("The table should not be in enabled state", cluster.getMaster() - .getAssignmentManager().getZKTable().isDisablingOrDisabledTable( + .getAssignmentManager().getTableStateManager().isDisablingOrDisabledTable( TableName.valueOf("tableRestart"))); log("Enabling table\n"); // Need a new Admin, the previous one is on the old master @@ -111,7 +111,7 @@ public class TestMasterRestartAfterDisablingTable { + " switch except for the catalog and namespace tables.", 6, regions.size()); assertTrue("The table should be in enabled state", cluster.getMaster() - .getAssignmentManager().getZKTable() + .getAssignmentManager().getTableStateManager() .isEnabledTable(TableName.valueOf("tableRestart"))); ht.close(); TEST_UTIL.shutdownMiniCluster(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java index ce6b777..7bd7893 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.MockServer; import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKTable; +import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -130,7 +130,7 @@ public class TestOpenedRegionHandler { // create a node with OPENED state zkw = HBaseTestingUtility.createAndForceNodeToOpenedState(TEST_UTIL, region, server.getServerName()); - when(am.getZKTable()).thenReturn(new ZKTable(zkw)); + when(am.getTableStateManager()).thenReturn(new ZKTableStateManager(zkw)); Stat stat = new Stat(); String nodeName = ZKAssign.getNodeName(zkw, region.getRegionInfo() .getEncodedName()); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java index 5b68f9f..d376c29 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java @@ -112,7 +112,7 @@ public class TestHLogMethods { public void testEntrySink() throws Exception { Configuration conf = new Configuration(); HLogSplitter splitter = new HLogSplitter( - conf, mock(Path.class), mock(FileSystem.class), null, null); + conf, mock(Path.class), mock(FileSystem.class), null, null, null); EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024); for (int i = 0; i < 1000; i++) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index 7ecef82..e6495c4 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -805,7 +805,7 @@ public class TestHLogSplit { logfiles != null && logfiles.length > 0); // Set up a splitter that will throw an IOE on the output side HLogSplitter logSplitter = new HLogSplitter( - conf, HBASEDIR, fs, null, null) { + conf, HBASEDIR, fs, null, null, null) { protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); @@ -938,7 +938,7 @@ public class TestHLogSplit { try { conf.setInt("hbase.splitlog.report.period", 1000); boolean ret = HLogSplitter.splitLogFile( - HBASEDIR, logfile, spiedFs, conf, localReporter, null, null); + HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null); assertFalse("Log splitting should failed", ret); assertTrue(count.get() > 0); } catch (IOException e) { @@ -997,7 +997,7 @@ public class TestHLogSplit { // Create a splitter that reads and writes the data without touching disk HLogSplitter logSplitter = new HLogSplitter( - localConf, HBASEDIR, fs, null, null) { + localConf, HBASEDIR, fs, null, null, null) { /* Produce a mock writer that doesn't write anywhere */ protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) @@ -1282,7 +1282,7 @@ public class TestHLogSplit { logfiles != null && logfiles.length > 0); HLogSplitter logSplitter = new HLogSplitter( - conf, HBASEDIR, fs, null, null) { + conf, HBASEDIR, fs, null, null, null) { protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 515aef0..05aeedb 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -873,7 +873,7 @@ public class TestWALReplay { wal.close(); FileStatus[] listStatus = this.fs.listStatus(wal.getDir()); HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], - this.fs, this.conf, null, null, null); + this.fs, this.conf, null, null, null, null); FileStatus[] listStatus1 = this.fs.listStatus( new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), "recovered.edits"))); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTable.java hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTable.java deleted file mode 100644 index a51a8d0..0000000 --- hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTable.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.zookeeper; - - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.*; -import org.apache.zookeeper.KeeperException; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(MediumTests.class) -public class TestZKTable { - private static final Log LOG = LogFactory.getLog(TestZooKeeperNodeTracker.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniZKCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniZKCluster(); - } - - @Test - public void testTableStates() - throws ZooKeeperConnectionException, IOException, KeeperException, InterruptedException { - final TableName name = - TableName.valueOf("testDisabled"); - Abortable abortable = new Abortable() { - @Override - public void abort(String why, Throwable e) { - LOG.info(why, e); - } - - @Override - public boolean isAborted() { - return false; - } - - }; - ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), - name.getNameAsString(), abortable, true); - ZKTable zkt = new ZKTable(zkw); - assertFalse(zkt.isEnabledTable(name)); - assertFalse(zkt.isDisablingTable(name)); - assertFalse(zkt.isDisabledTable(name)); - assertFalse(zkt.isEnablingTable(name)); - assertFalse(zkt.isDisablingOrDisabledTable(name)); - assertFalse(zkt.isDisabledOrEnablingTable(name)); - assertFalse(zkt.isTablePresent(name)); - zkt.setDisablingTable(name); - assertTrue(zkt.isDisablingTable(name)); - assertTrue(zkt.isDisablingOrDisabledTable(name)); - assertFalse(zkt.getDisabledTables().contains(name)); - assertTrue(zkt.isTablePresent(name)); - zkt.setDisabledTable(name); - assertTrue(zkt.isDisabledTable(name)); - assertTrue(zkt.isDisablingOrDisabledTable(name)); - assertFalse(zkt.isDisablingTable(name)); - assertTrue(zkt.getDisabledTables().contains(name)); - assertTrue(zkt.isTablePresent(name)); - zkt.setEnablingTable(name); - assertTrue(zkt.isEnablingTable(name)); - assertTrue(zkt.isDisabledOrEnablingTable(name)); - assertFalse(zkt.isDisabledTable(name)); - assertFalse(zkt.getDisabledTables().contains(name)); - assertTrue(zkt.isTablePresent(name)); - zkt.setEnabledTable(name); - assertTrue(zkt.isEnabledTable(name)); - assertFalse(zkt.isEnablingTable(name)); - assertTrue(zkt.isTablePresent(name)); - zkt.setDeletedTable(name); - assertFalse(zkt.isEnabledTable(name)); - assertFalse(zkt.isDisablingTable(name)); - assertFalse(zkt.isDisabledTable(name)); - assertFalse(zkt.isEnablingTable(name)); - assertFalse(zkt.isDisablingOrDisabledTable(name)); - assertFalse(zkt.isDisabledOrEnablingTable(name)); - assertFalse(zkt.isTablePresent(name)); - } - -} - diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTableStateManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTableStateManager.java new file mode 100644 index 0000000..522d8e6 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTableStateManager.java @@ -0,0 +1,111 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.exceptions.ConsensusException; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestZKTableStateManager { + private static final Log LOG = LogFactory.getLog(TestZKTableStateManager.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Test + public void testTableStates() + throws ConsensusException, IOException, KeeperException, InterruptedException { + final TableName name = + TableName.valueOf("testDisabled"); + Abortable abortable = new Abortable() { + @Override + public void abort(String why, Throwable e) { + LOG.info(why, e); + } + + @Override + public boolean isAborted() { + return false; + } + + }; + ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + name.getNameAsString(), abortable, true); + TableStateManager zkt = new ZKTableStateManager(zkw); + assertFalse(zkt.isEnabledTable(name)); + assertFalse(zkt.isDisablingTable(name)); + assertFalse(zkt.isDisabledTable(name)); + assertFalse(zkt.isEnablingTable(name)); + assertFalse(zkt.isDisablingOrDisabledTable(name)); + assertFalse(zkt.isDisabledOrEnablingTable(name)); + assertFalse(zkt.isTablePresent(name)); + zkt.setDisablingTable(name); + assertTrue(zkt.isDisablingTable(name)); + assertTrue(zkt.isDisablingOrDisabledTable(name)); + assertFalse(zkt.getDisabledTables().contains(name)); + assertTrue(zkt.isTablePresent(name)); + zkt.setDisabledTable(name); + assertTrue(zkt.isDisabledTable(name)); + assertTrue(zkt.isDisablingOrDisabledTable(name)); + assertFalse(zkt.isDisablingTable(name)); + assertTrue(zkt.getDisabledTables().contains(name)); + assertTrue(zkt.isTablePresent(name)); + zkt.setEnablingTable(name); + assertTrue(zkt.isEnablingTable(name)); + assertTrue(zkt.isDisabledOrEnablingTable(name)); + assertFalse(zkt.isDisabledTable(name)); + assertFalse(zkt.getDisabledTables().contains(name)); + assertTrue(zkt.isTablePresent(name)); + zkt.setEnabledTable(name); + assertTrue(zkt.isEnabledTable(name)); + assertFalse(zkt.isEnablingTable(name)); + assertTrue(zkt.isTablePresent(name)); + zkt.setDeletedTable(name); + assertFalse(zkt.isEnabledTable(name)); + assertFalse(zkt.isDisablingTable(name)); + assertFalse(zkt.isDisabledTable(name)); + assertFalse(zkt.isEnablingTable(name)); + assertFalse(zkt.isDisablingOrDisabledTable(name)); + assertFalse(zkt.isDisabledOrEnablingTable(name)); + assertFalse(zkt.isTablePresent(name)); + } + +} +