diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/HBaseIntegrationTests.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/HBaseIntegrationTests.java new file mode 100644 index 0000000..58b1ee9 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/HBaseIntegrationTests.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import co.cask.tephra.hbase98.coprocessor.TransactionProcessor; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.security.SessionStateConfigUserAuthenticator; +import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactoryForTest; +import org.apache.hadoop.hive.ql.session.SessionState; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Integration tests with HBase Mini-cluster for HBaseStore + */ +public class HBaseIntegrationTests { + + private static final Log LOG = LogFactory.getLog(HBaseIntegrationTests.class.getName()); + + protected static HBaseTestingUtility utility; + protected static HBaseAdmin admin; + protected static Map emptyParameters = new HashMap(); + protected static HiveConf conf; + + protected HBaseStore store; + protected Driver driver; + + protected static void startMiniCluster() throws Exception { + String connectionClassName = + System.getProperty(HiveConf.ConfVars.METASTORE_HBASE_CONNECTION_CLASS.varname); + boolean testingTephra = + connectionClassName != null && connectionClassName.equals(TephraHBaseConnection.class.getName()); + utility = new HBaseTestingUtility(); + utility.startMiniCluster(); + conf = new HiveConf(utility.getConfiguration(), HBaseIntegrationTests.class); + admin = utility.getHBaseAdmin(); + for (String tableName : HBaseReadWrite.tableNames) { + List families = HBaseReadWrite.columnFamilies.get(tableName); + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + for (byte[] family : families) { + HColumnDescriptor columnDesc = new HColumnDescriptor(family); + if (testingTephra) columnDesc.setMaxVersions(Integer.MAX_VALUE); + desc.addFamily(columnDesc); + } + if (testingTephra) desc.addCoprocessor(TransactionProcessor.class.getName()); + admin.createTable(desc); + } + admin.close(); + } + + protected static void shutdownMiniCluster() throws Exception { + utility.shutdownMiniCluster(); + } + + protected void setupConnection() throws IOException { + + } + + protected void setupDriver() { + // This chicanery is necessary to make the driver work. Hive tests need the pfile file + // system, while the hbase one uses something else. So first make sure we've configured our + // hbase connection, then get a new config file and populate it as desired. + HBaseReadWrite.getInstance(conf); + conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, + "org.apache.hadoop.hive.metastore.hbase.HBaseStore"); + conf.setBoolVar(HiveConf.ConfVars.METASTORE_FASTPATH, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + // Setup so we can test SQL standard auth + conf.setBoolVar(HiveConf.ConfVars.HIVE_TEST_AUTHORIZATION_SQLSTD_HS2_MODE, true); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + SQLStdHiveAuthorizerFactoryForTest.class.getName()); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, + SessionStateConfigUserAuthenticator.class.getName()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED, true); + conf.setVar(HiveConf.ConfVars.USERS_IN_ADMIN_ROLE, System.getProperty("user.name")); + //HBaseReadWrite.setTestConnection(hconn); + + SessionState.start(new CliSessionState(conf)); + driver = new Driver(conf); + } + + protected void setupHBaseStore() { + // Turn off caching, as we want to test actual interaction with HBase + conf.setBoolean(HBaseReadWrite.NO_CACHE_CONF, true); + store = new HBaseStore(); + store.setConf(conf); + } + +} + diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/IMockUtils.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/IMockUtils.java deleted file mode 100644 index c30ac34..0000000 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/IMockUtils.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hive.metastore.hbase; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.security.SessionStateConfigUserAuthenticator; -import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactoryForTest; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * Integration tests with HBase Mini-cluster for HBaseStore - */ -public class IMockUtils { - - private static final Log LOG = LogFactory.getLog(IMockUtils.class.getName()); - - protected static HBaseTestingUtility utility; - protected static HTableInterface tblTable; - protected static HTableInterface sdTable; - protected static HTableInterface partTable; - protected static HTableInterface dbTable; - protected static HTableInterface funcTable; - protected static HTableInterface roleTable; - protected static HTableInterface globalPrivsTable; - protected static HTableInterface principalRoleMapTable; - protected static Map emptyParameters = new HashMap(); - - @Mock - private HBaseConnection hconn; - protected HBaseStore store; - protected HiveConf conf; - protected Driver driver; - - protected static void startMiniCluster() throws Exception { - utility = new HBaseTestingUtility(); - utility.startMiniCluster(); - byte[][] families = new byte[][]{HBaseReadWrite.CATALOG_CF, HBaseReadWrite.STATS_CF}; - tblTable = utility.createTable(HBaseReadWrite.TABLE_TABLE.getBytes(HBaseUtils.ENCODING), - families); - sdTable = utility.createTable(HBaseReadWrite.SD_TABLE.getBytes(HBaseUtils.ENCODING), - HBaseReadWrite.CATALOG_CF); - partTable = utility.createTable(HBaseReadWrite.PART_TABLE.getBytes(HBaseUtils.ENCODING), - families); - dbTable = utility.createTable(HBaseReadWrite.DB_TABLE.getBytes(HBaseUtils.ENCODING), - HBaseReadWrite.CATALOG_CF); - funcTable = utility.createTable(HBaseReadWrite.FUNC_TABLE.getBytes(HBaseUtils.ENCODING), - HBaseReadWrite.CATALOG_CF); - roleTable = utility.createTable(HBaseReadWrite.ROLE_TABLE.getBytes(HBaseUtils.ENCODING), - HBaseReadWrite.CATALOG_CF); - globalPrivsTable = - utility.createTable(HBaseReadWrite.GLOBAL_PRIVS_TABLE.getBytes(HBaseUtils.ENCODING), - HBaseReadWrite.CATALOG_CF); - principalRoleMapTable = - utility.createTable(HBaseReadWrite.USER_TO_ROLE_TABLE.getBytes(HBaseUtils.ENCODING), - HBaseReadWrite.CATALOG_CF); - } - - protected static void shutdownMiniCluster() throws Exception { - utility.shutdownMiniCluster(); - } - - protected void setupConnection() throws IOException { - MockitoAnnotations.initMocks(this); - Mockito.when(hconn.getHBaseTable(HBaseReadWrite.SD_TABLE)).thenReturn(sdTable); - Mockito.when(hconn.getHBaseTable(HBaseReadWrite.TABLE_TABLE)).thenReturn(tblTable); - Mockito.when(hconn.getHBaseTable(HBaseReadWrite.PART_TABLE)).thenReturn(partTable); - Mockito.when(hconn.getHBaseTable(HBaseReadWrite.DB_TABLE)).thenReturn(dbTable); - Mockito.when(hconn.getHBaseTable(HBaseReadWrite.FUNC_TABLE)).thenReturn(funcTable); - Mockito.when(hconn.getHBaseTable(HBaseReadWrite.ROLE_TABLE)).thenReturn(roleTable); - Mockito.when(hconn.getHBaseTable(HBaseReadWrite.GLOBAL_PRIVS_TABLE)).thenReturn( - globalPrivsTable); - Mockito.when(hconn.getHBaseTable(HBaseReadWrite.USER_TO_ROLE_TABLE)).thenReturn( - principalRoleMapTable); - conf = new HiveConf(); - } - - protected void setupDriver() { - conf.setVar(HiveConf.ConfVars.METASTORE_HBASE_CONNECTION_CLASS, HBaseReadWrite.TEST_CONN); - conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); - conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, - "org.apache.hadoop.hive.metastore.hbase.HBaseStore"); - conf.setBoolVar(HiveConf.ConfVars.METASTORE_FASTPATH, true); - conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - // Setup so we can test SQL standard auth - conf.setBoolVar(HiveConf.ConfVars.HIVE_TEST_AUTHORIZATION_SQLSTD_HS2_MODE, true); - conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, - SQLStdHiveAuthorizerFactoryForTest.class.getName()); - conf.setVar(HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, - SessionStateConfigUserAuthenticator.class.getName()); - conf.setBoolVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED, true); - conf.setVar(HiveConf.ConfVars.USERS_IN_ADMIN_ROLE, System.getProperty("user.name")); - HBaseReadWrite.setTestConnection(hconn); - - SessionState.start(new CliSessionState(conf)); - driver = new Driver(conf); - } - - protected void setupHBaseStore() { - // Turn off caching, as we want to test actual interaction with HBase - conf.setBoolean(HBaseReadWrite.NO_CACHE_CONF, true); - conf.setVar(HiveConf.ConfVars.METASTORE_HBASE_CONNECTION_CLASS, HBaseReadWrite.TEST_CONN); - HBaseReadWrite.setTestConnection(hconn); - // HBaseReadWrite hbase = HBaseReadWrite.getInstance(conf); - store = new HBaseStore(); - store.setConf(conf); - } - -} - diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java index 5014217..7bdff18 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java @@ -48,18 +48,18 @@ /** * Test that import from an RDBMS based metastore works */ -public class TestHBaseImport extends IMockUtils { +public class TestHBaseImport extends HBaseIntegrationTests { private static final Log LOG = LogFactory.getLog(TestHBaseStoreIntegration.class.getName()); @BeforeClass public static void startup() throws Exception { - IMockUtils.startMiniCluster(); + HBaseIntegrationTests.startMiniCluster(); } @AfterClass public static void shutdown() throws Exception { - IMockUtils.shutdownMiniCluster(); + HBaseIntegrationTests.shutdownMiniCluster(); } @Before diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseMetastoreSql.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseMetastoreSql.java index fe5e8e2..46506a5 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseMetastoreSql.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseMetastoreSql.java @@ -25,26 +25,27 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; + +import java.io.File; import java.io.IOException; /** * Integration tests with HBase Mini-cluster using actual SQL */ -public class TestHBaseMetastoreSql extends IMockUtils { +public class TestHBaseMetastoreSql extends HBaseIntegrationTests { private static final Log LOG = LogFactory.getLog(TestHBaseStoreIntegration.class.getName()); @BeforeClass public static void startup() throws Exception { - IMockUtils.startMiniCluster(); + HBaseIntegrationTests.startMiniCluster(); } @AfterClass public static void shutdown() throws Exception { - IMockUtils.shutdownMiniCluster(); + HBaseIntegrationTests.shutdownMiniCluster(); } @Before diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseStoreIntegration.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseStoreIntegration.java index 2d4e2cb..4ff01a4 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseStoreIntegration.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseStoreIntegration.java @@ -72,7 +72,7 @@ /** * Integration tests with HBase Mini-cluster for HBaseStore */ -public class TestHBaseStoreIntegration extends IMockUtils { +public class TestHBaseStoreIntegration extends HBaseIntegrationTests { private static final Log LOG = LogFactory.getLog(TestHBaseStoreIntegration.class.getName()); @@ -80,12 +80,12 @@ @BeforeClass public static void startup() throws Exception { - IMockUtils.startMiniCluster(); + HBaseIntegrationTests.startMiniCluster(); } @AfterClass public static void shutdown() throws Exception { - IMockUtils.shutdownMiniCluster(); + HBaseIntegrationTests.shutdownMiniCluster(); } @Before @@ -828,7 +828,7 @@ public void userToRoleMap() throws Exception { store.grantRole(role1, user1, PrincipalType.USER, "bob", PrincipalType.USER, false); store.grantRole(role1, roleName2, PrincipalType.ROLE, "admin", PrincipalType.ROLE, true); - List roles = HBaseReadWrite.getInstance(conf).getUserRoles(user1); + List roles = HBaseReadWrite.getInstance().getUserRoles(user1); Assert.assertEquals(2, roles.size()); String[] roleNames = roles.toArray(new String[roles.size()]); Arrays.sort(roleNames); @@ -874,7 +874,7 @@ public void userToRoleMapOnDrop() throws Exception { store.grantRole(role1, roleName2, PrincipalType.ROLE, "admin", PrincipalType.ROLE, true); store.grantRole(role1, user2, PrincipalType.USER, "bob", PrincipalType.USER, false); - List roles = HBaseReadWrite.getInstance(conf).getUserRoles(user2); + List roles = HBaseReadWrite.getInstance().getUserRoles(user2); Assert.assertEquals(2, roles.size()); String[] roleNames = roles.toArray(new String[roles.size()]); Arrays.sort(roleNames); @@ -1112,7 +1112,7 @@ private void checkRoleRemovedFromAllPrivileges(HiveObjectType objectType, String List pgi = null; switch (objectType) { case GLOBAL: - pgi = HBaseReadWrite.getInstance(conf).getGlobalPrivs().getRolePrivileges().get(roleName); + pgi = HBaseReadWrite.getInstance().getGlobalPrivs().getRolePrivileges().get(roleName); break; case DATABASE: diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestStorageDescriptorSharing.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestStorageDescriptorSharing.java index 548576e..decfa4a 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestStorageDescriptorSharing.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestStorageDescriptorSharing.java @@ -40,7 +40,7 @@ /** * Integration tests with HBase Mini-cluster for HBaseStore */ -public class TestStorageDescriptorSharing extends IMockUtils { +public class TestStorageDescriptorSharing extends HBaseIntegrationTests { private static final Log LOG = LogFactory.getLog(TestHBaseStoreIntegration.class.getName()); @@ -48,12 +48,12 @@ @BeforeClass public static void startup() throws Exception { - IMockUtils.startMiniCluster(); + HBaseIntegrationTests.startMiniCluster(); } @AfterClass public static void shutdown() throws Exception { - IMockUtils.shutdownMiniCluster(); + HBaseIntegrationTests.shutdownMiniCluster(); } @Before diff --git metastore/pom.xml metastore/pom.xml index adde0b5..0ec855d 100644 --- metastore/pom.xml +++ metastore/pom.xml @@ -126,6 +126,21 @@ libthrift ${libthrift.version} + + co.cask.tephra + tephra-api + ${tephra.version} + + + co.cask.tephra + tephra-core + ${tephra.version} + + + co.cask.tephra + tephra-hbase-compat-0.98 + ${tephra.version} + junit diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseConnection.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseConnection.java index 68acc1d..696e588 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseConnection.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseConnection.java @@ -61,6 +61,13 @@ void rollbackTransaction() throws IOException; /** + * Flush commits. A no-op for transaction implementations since they will write at commit time. + * @param htab Table to flush + * @throws IOException + */ + void flush(HTableInterface htab) throws IOException; + + /** * Create a new table * @param tableName name of the table * @param columnFamilies name of the column families in the table diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index 98f7ff1..b0dc707 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -87,6 +87,25 @@ @VisibleForTesting final static byte[] CATALOG_CF = "c".getBytes(HBaseUtils.ENCODING); @VisibleForTesting final static byte[] STATS_CF = "s".getBytes(HBaseUtils.ENCODING); @VisibleForTesting final static String NO_CACHE_CONF = "no.use.cache"; + /** + * List of tables in HBase + */ + final static String[] tableNames = { DB_TABLE, FUNC_TABLE, GLOBAL_PRIVS_TABLE, PART_TABLE, + USER_TO_ROLE_TABLE, ROLE_TABLE, SD_TABLE, TABLE_TABLE }; + final static Map> columnFamilies = + new HashMap> (tableNames.length); + + static { + columnFamilies.put(DB_TABLE, Arrays.asList(CATALOG_CF)); + columnFamilies.put(FUNC_TABLE, Arrays.asList(CATALOG_CF)); + columnFamilies.put(GLOBAL_PRIVS_TABLE, Arrays.asList(CATALOG_CF)); + columnFamilies.put(PART_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); + columnFamilies.put(USER_TO_ROLE_TABLE, Arrays.asList(CATALOG_CF)); + columnFamilies.put(ROLE_TABLE, Arrays.asList(CATALOG_CF)); + columnFamilies.put(SD_TABLE, Arrays.asList(CATALOG_CF)); + columnFamilies.put(TABLE_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); + } + private final static byte[] CATALOG_COL = "cat".getBytes(HBaseUtils.ENCODING); private final static byte[] ROLES_COL = "roles".getBytes(HBaseUtils.ENCODING); private final static byte[] REF_COUNT_COL = "ref".getBytes(HBaseUtils.ENCODING); @@ -96,8 +115,6 @@ @VisibleForTesting final static String TEST_CONN = "test_connection"; private static HBaseConnection testConn; - private final static String[] tableNames = { DB_TABLE, FUNC_TABLE, GLOBAL_PRIVS_TABLE, PART_TABLE, - USER_TO_ROLE_TABLE, ROLE_TABLE, SD_TABLE, TABLE_TABLE }; static final private Log LOG = LogFactory.getLog(HBaseReadWrite.class.getName()); private static ThreadLocal self = new ThreadLocal() { @@ -234,12 +251,16 @@ static synchronized void createTablesIfNotExist() throws IOException { if (!tablesCreated) { for (String name : tableNames) { if (self.get().conn.getHBaseTable(name, true) == null) { + List families = columnFamilies.get(name); + self.get().conn.createHBaseTable(name, families); + /* List columnFamilies = new ArrayList(); columnFamilies.add(CATALOG_CF); if (TABLE_TABLE.equals(name) || PART_TABLE.equals(name)) { columnFamilies.add(STATS_CF); } self.get().conn.createHBaseTable(name, columnFamilies); + */ } } tablesCreated = true; @@ -541,7 +562,7 @@ void putPartitions(List partitions) throws IOException { } HTableInterface htab = conn.getHBaseTable(PART_TABLE); htab.put(puts); - htab.flushCommits(); + conn.flush(htab); } void replacePartitions(List oldParts, List newParts) throws IOException { @@ -567,7 +588,7 @@ void replacePartitions(List oldParts, List newParts) throw } HTableInterface htab = conn.getHBaseTable(PART_TABLE); htab.put(puts); - htab.flushCommits(); + conn.flush(htab); } /** @@ -1014,7 +1035,7 @@ void removeRoleGrants(String roleName) throws IOException { if (puts.size() > 0) { HTableInterface htab = conn.getHBaseTable(ROLE_TABLE); htab.put(puts); - htab.flushCommits(); + conn.flush(htab); } // Remove any global privileges held by this role @@ -1043,7 +1064,7 @@ void removeRoleGrants(String roleName) throws IOException { if (puts.size() > 0) { HTableInterface htab = conn.getHBaseTable(DB_TABLE); htab.put(puts); - htab.flushCommits(); + conn.flush(htab); } // Finally, walk the table table @@ -1068,7 +1089,7 @@ void removeRoleGrants(String roleName) throws IOException { if (puts.size() > 0) { HTableInterface htab = conn.getHBaseTable(TABLE_TABLE); htab.put(puts); - htab.flushCommits(); + conn.flush(htab); } } @@ -1354,7 +1375,7 @@ void decrementStorageDescriptorRefCount(StorageDescriptor sd) throws IOException Put p = new Put(key); p.add(CATALOG_CF, REF_COUNT_COL, Integer.toString(refCnt).getBytes(HBaseUtils.ENCODING)); htab.put(p); - htab.flushCommits(); + conn.flush(htab); } } @@ -1386,7 +1407,7 @@ void decrementStorageDescriptorRefCount(StorageDescriptor sd) throws IOException p.add(CATALOG_CF, REF_COUNT_COL, Integer.toString(refCnt).getBytes(HBaseUtils.ENCODING)); htab.put(p); } - htab.flushCommits(); + conn.flush(htab); return key; } @@ -1699,7 +1720,7 @@ private void store(String table, byte[] key, byte[] colFam, byte[] colName, byte Put p = new Put(key); p.add(colFam, colName, obj); htab.put(p); - htab.flushCommits(); + conn.flush(htab); } private void store(String table, byte[] key, byte[] colFam, byte[][] colName, byte[][] obj) @@ -1710,7 +1731,7 @@ private void store(String table, byte[] key, byte[] colFam, byte[][] colName, by p.add(colFam, colName[i], obj[i]); } htab.put(p); - htab.flushCommits(); + conn.flush(htab); } private byte[] read(String table, byte[] key, byte[] colFam, byte[] colName) throws IOException { diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 0b1135e..79a61d4 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -102,56 +102,80 @@ public void shutdown() { @Override public boolean openTransaction() { - if (txnNestLevel++ == 0) getHBase().begin(); + if (txnNestLevel++ <= 0) { + LOG.debug("Opening HBase transaction"); + getHBase().begin(); + txnNestLevel = 1; + } return true; } @Override public boolean commitTransaction() { - if (txnNestLevel-- < 1) getHBase().commit(); + if (--txnNestLevel == 0) { + LOG.debug("Committing HBase transaction"); + getHBase().commit(); + } return true; } @Override public void rollbackTransaction() { txnNestLevel = 0; + LOG.debug("Rolling back HBase transaction"); getHBase().rollback(); } @Override public void createDatabase(Database db) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); try { + // HiveMetaStore already checks for existence of the database, don't recheck getHBase().putDb(db); + commit = true; } catch (IOException e) { LOG.error("Unable to create database ", e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public Database getDatabase(String name) throws NoSuchObjectException { + boolean commit = false; + openTransaction(); try { Database db = getHBase().getDb(name); if (db == null) { throw new NoSuchObjectException("Unable to find db " + name); } + commit = true; return db; } catch (IOException e) { LOG.error("Unable to get db", e); throw new NoSuchObjectException("Error reading db " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException { + boolean commit = false; + openTransaction(); try { getHBase().deleteDb(dbname); + commit = true; return true; } catch (IOException e) { LOG.error("Unable to delete db" + e); throw new MetaException("Unable to drop database " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -160,25 +184,35 @@ public boolean alterDatabase(String dbname, Database db) throws NoSuchObjectExce MetaException { // ObjectStore fetches the old db before updating it, but I can't see the possible value of // that since the caller will have needed to call getDatabase to have the db object. + boolean commit = false; + openTransaction(); try { getHBase().putDb(db); + commit = true; return true; } catch (IOException e) { LOG.error("Unable to alter database ", e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public List getDatabases(String pattern) throws MetaException { + boolean commit = false; + openTransaction(); try { List dbs = getHBase().scanDatabases(likeToRegex(pattern)); List dbNames = new ArrayList(dbs.size()); for (Database db : dbs) dbNames.add(db.getName()); + commit = true; return dbNames; } catch (IOException e) { LOG.error("Unable to get databases ", e); throw new MetaException("Unable to get databases, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -204,61 +238,86 @@ public boolean dropType(String typeName) { @Override public void createTable(Table tbl) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); // HiveMetaStore above us checks if the table already exists, so we can blindly store it here. try { getHBase().putTable(tbl); + commit = true; } catch (IOException e) { LOG.error("Unable to create table ", e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public boolean dropTable(String dbName, String tableName) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { + boolean commit = false; + openTransaction(); try { getHBase().deleteTable(dbName, tableName); + commit = true; return true; } catch (IOException e) { LOG.error("Unable to delete db" + e); throw new MetaException("Unable to drop table " + tableNameForErrorMsg(dbName, tableName)); + } finally { + commitOrRoleBack(commit); } } @Override public Table getTable(String dbName, String tableName) throws MetaException { + boolean commit = false; + openTransaction(); try { Table table = getHBase().getTable(dbName, tableName); if (table == null) { LOG.debug("Unable to find table " + tableNameForErrorMsg(dbName, tableName)); } + commit = true; return table; } catch (IOException e) { LOG.error("Unable to get table", e); throw new MetaException("Error reading table " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); try { getHBase().putPartition(part); + commit = true; return true; } catch (IOException e) { LOG.error("Unable to add partition", e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public boolean addPartitions(String dbName, String tblName, List parts) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); try { getHBase().putPartitions(parts); + commit = true; return true; } catch (IOException e) { LOG.error("Unable to add partitions", e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -271,87 +330,125 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p @Override public Partition getPartition(String dbName, String tableName, List part_vals) throws MetaException, NoSuchObjectException { + boolean commit = false; + openTransaction(); try { Partition part = getHBase().getPartition(dbName, tableName, part_vals); if (part == null) { throw new NoSuchObjectException("Unable to find partition " + partNameForErrorMsg(dbName, tableName, part_vals)); } + commit = true; return part; } catch (IOException e) { LOG.error("Unable to get partition", e); throw new MetaException("Error reading partition " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public boolean doesPartitionExist(String dbName, String tableName, List part_vals) throws MetaException, NoSuchObjectException { + boolean commit = false; + openTransaction(); try { - return getHBase().getPartition(dbName, tableName, part_vals) != null; + boolean exists = getHBase().getPartition(dbName, tableName, part_vals) != null; + commit = true; + return exists; } catch (IOException e) { LOG.error("Unable to get partition", e); throw new MetaException("Error reading partition " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public boolean dropPartition(String dbName, String tableName, List part_vals) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { + boolean commit = false; + openTransaction(); try { getHBase().deletePartition(dbName, tableName, part_vals); + commit = true; return true; } catch (IOException e) { LOG.error("Unable to delete db" + e); throw new MetaException("Unable to drop partition " + partNameForErrorMsg(dbName, tableName, part_vals)); + } finally { + commitOrRoleBack(commit); } } @Override public List getPartitions(String dbName, String tableName, int max) throws MetaException, NoSuchObjectException { + boolean commit = false; + openTransaction(); try { - return getHBase().scanPartitionsInTable(dbName, tableName, max); + List parts = getHBase().scanPartitionsInTable(dbName, tableName, max); + commit = true; + return parts; } catch (IOException e) { LOG.error("Unable to get partitions", e); throw new MetaException("Error scanning partitions"); + } finally { + commitOrRoleBack(commit); } } @Override public void alterTable(String dbname, String name, Table newTable) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); try { Table oldTable = getHBase().getTable(dbname, name); getHBase().replaceTable(oldTable, newTable); + commit = true; } catch (IOException e) { LOG.error("Unable to alter table " + tableNameForErrorMsg(dbname, name), e); throw new MetaException("Unable to alter table " + tableNameForErrorMsg(dbname, name)); + } finally { + commitOrRoleBack(commit); } } @Override public List getTables(String dbName, String pattern) throws MetaException { + boolean commit = false; + openTransaction(); try { List tables = getHBase().scanTables(dbName, likeToRegex(pattern)); List tableNames = new ArrayList(tables.size()); for (Table table : tables) tableNames.add(table.getTableName()); + commit = true; return tableNames; } catch (IOException e) { LOG.error("Unable to get tables ", e); throw new MetaException("Unable to get tables, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public List
getTableObjectsByName(String dbname, List tableNames) throws MetaException, UnknownDBException { + boolean commit = false; + openTransaction(); try { - return getHBase().getTables(dbname, tableNames); + List
tables = getHBase().getTables(dbname, tableNames); + commit = true; + return tables; } catch (IOException e) { LOG.error("Unable to get tables ", e); throw new MetaException("Unable to get tables, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -370,6 +467,8 @@ public void alterTable(String dbname, String name, Table newTable) throws Invali @Override public List listPartitionNames(String db_name, String tbl_name, short max_parts) throws MetaException { + boolean commit = false; + openTransaction(); try { List parts = getHBase().scanPartitionsInTable(db_name, tbl_name, max_parts); if (parts == null) return null; @@ -378,10 +477,13 @@ public void alterTable(String dbname, String name, Table newTable) throws Invali for (Partition p : parts) { names.add(buildExternalPartName(table, p)); } + commit = true; return names; } catch (IOException e) { LOG.error("Unable to get partitions", e); throw new MetaException("Error scanning partitions"); + } finally { + commitOrRoleBack(commit); } } @@ -395,12 +497,17 @@ public void alterTable(String dbname, String name, Table newTable) throws Invali @Override public void alterPartition(String db_name, String tbl_name, List part_vals, Partition new_part) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); try { Partition oldPart = getHBase().getPartition(db_name, tbl_name, part_vals); getHBase().replacePartition(oldPart, new_part); + commit = true; } catch (IOException e) { LOG.error("Unable to add partition", e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -408,12 +515,17 @@ public void alterPartition(String db_name, String tbl_name, List part_va public void alterPartitions(String db_name, String tbl_name, List> part_vals_list, List new_parts) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); try { List oldParts = getHBase().getPartitions(db_name, tbl_name, part_vals_list); getHBase().replacePartitions(oldParts, new_parts); + commit = true; } catch (IOException e) { LOG.error("Unable to add partition", e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -460,8 +572,14 @@ public void alterIndex(String dbname, String baseTblName, String name, Index new final ExpressionTree exprTree = (filter != null && !filter.isEmpty()) ? PartFilterExprUtil .getFilterParser(filter).tree : ExpressionTree.EMPTY_TREE; List result = new ArrayList(); - getPartitionsByExprInternal(dbName, tblName, exprTree, maxParts, result); - return result; + boolean commit = false; + openTransaction(); + try { + getPartitionsByExprInternal(dbName, tblName, exprTree, maxParts, result); + return result; + } finally { + commitOrRoleBack(commit); + } } @Override @@ -472,7 +590,13 @@ public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, // TODO: investigate if there should be any role for defaultPartitionName in this // implementation. direct sql code path in ObjectStore does not use it. - return getPartitionsByExprInternal(dbName, tblName, exprTree, maxParts, result); + boolean commit = false; + openTransaction(); + try { + return getPartitionsByExprInternal(dbName, tblName, exprTree, maxParts, result); + } finally { + commitOrRoleBack(commit); + } } private boolean getPartitionsByExprInternal(String dbName, String tblName, @@ -592,20 +716,27 @@ public boolean addRole(String roleName, String ownerName) throws InvalidObjectEx MetaException, NoSuchObjectException { int now = (int)(System.currentTimeMillis()/1000); Role role = new Role(roleName, now, ownerName); + boolean commit = false; + openTransaction(); try { if (getHBase().getRole(roleName) != null) { throw new InvalidObjectException("Role " + roleName + " already exists"); } getHBase().putRole(role); + commit = true; return true; } catch (IOException e) { LOG.error("Unable to create role ", e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public boolean removeRole(String roleName) throws MetaException, NoSuchObjectException { + boolean commit = false; + openTransaction(); try { Set usersInRole = getHBase().findAllUsersInRole(roleName); getHBase().deleteRole(roleName); @@ -613,10 +744,13 @@ public boolean removeRole(String roleName) throws MetaException, NoSuchObjectExc for (String user : usersInRole) { getHBase().buildRoleMapForUser(user); } + commit = true; return true; } catch (IOException e) { LOG.error("Unable to delete role" + e); throw new MetaException("Unable to drop role " + roleName); + } finally { + commitOrRoleBack(commit); } } @@ -624,6 +758,8 @@ public boolean removeRole(String roleName) throws MetaException, NoSuchObjectExc public boolean grantRole(Role role, String userName, PrincipalType principalType, String grantor, PrincipalType grantorType, boolean grantOption) throws MetaException, NoSuchObjectException, InvalidObjectException { + boolean commit = false; + openTransaction(); try { Set usersToRemap = findUsersToRemapRolesFor(role, userName, principalType); HbaseMetastoreProto.RoleGrantInfo.Builder builder = @@ -643,16 +779,21 @@ public boolean grantRole(Role role, String userName, PrincipalType principalType for (String user : usersToRemap) { getHBase().buildRoleMapForUser(user); } + commit = true; return true; } catch (IOException e) { LOG.error("Unable to grant role", e); throw new MetaException("Unable to grant role " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public boolean revokeRole(Role role, String userName, PrincipalType principalType, boolean grantOption) throws MetaException, NoSuchObjectException { + boolean commit = false; + openTransaction(); // This can have a couple of different meanings. If grantOption is true, then this is only // revoking the grant option, the role itself doesn't need to be removed. If it is false // then we need to remove the userName from the role altogether. @@ -667,16 +808,21 @@ public boolean revokeRole(Role role, String userName, PrincipalType principalTyp getHBase().buildRoleMapForUser(user); } } + commit = true; return true; } catch (IOException e) { LOG.error("Unable to revoke role " + role.getRoleName() + " from " + userName, e); throw new MetaException("Unable to revoke role " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public PrincipalPrivilegeSet getUserPrivilegeSet(String userName, List groupNames) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); try { PrincipalPrivilegeSet pps = new PrincipalPrivilegeSet(); PrincipalPrivilegeSet global = getHBase().getGlobalPrivs(); @@ -700,10 +846,13 @@ public PrincipalPrivilegeSet getUserPrivilegeSet(String userName, List g } } } + commit = true; return pps; } catch (IOException e) { LOG.error("Unable to get db privileges for user", e); throw new MetaException("Unable to get db privileges for user, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -711,6 +860,8 @@ public PrincipalPrivilegeSet getUserPrivilegeSet(String userName, List g public PrincipalPrivilegeSet getDBPrivilegeSet(String dbName, String userName, List groupNames) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); try { PrincipalPrivilegeSet pps = new PrincipalPrivilegeSet(); Database db = getHBase().getDb(dbName); @@ -736,10 +887,13 @@ public PrincipalPrivilegeSet getDBPrivilegeSet(String dbName, String userName, } } } + commit = true; return pps; } catch (IOException e) { LOG.error("Unable to get db privileges for user", e); throw new MetaException("Unable to get db privileges for user, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -747,6 +901,8 @@ public PrincipalPrivilegeSet getDBPrivilegeSet(String dbName, String userName, public PrincipalPrivilegeSet getTablePrivilegeSet(String dbName, String tableName, String userName, List groupNames) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); try { PrincipalPrivilegeSet pps = new PrincipalPrivilegeSet(); Table table = getHBase().getTable(dbName, tableName); @@ -771,10 +927,13 @@ public PrincipalPrivilegeSet getTablePrivilegeSet(String dbName, String tableNam } } } + commit = true; return pps; } catch (IOException e) { LOG.error("Unable to get db privileges for user", e); throw new MetaException("Unable to get db privileges for user, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -802,6 +961,8 @@ public PrincipalPrivilegeSet getColumnPrivilegeSet(String dbName, String tableNa PrincipalType principalType) { List grants; List privileges = new ArrayList(); + boolean commit = false; + openTransaction(); try { PrincipalPrivilegeSet pps = getHBase().getGlobalPrivs(); if (pps == null) return privileges; @@ -827,9 +988,12 @@ public PrincipalPrivilegeSet getColumnPrivilegeSet(String dbName, String tableNa privileges.add(new HiveObjectPrivilege(new HiveObjectRef(HiveObjectType.GLOBAL, null, null, null, null), principalName, principalType, pgi)); } + commit = true; return privileges; } catch (IOException e) { throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); } } @@ -839,6 +1003,8 @@ public PrincipalPrivilegeSet getColumnPrivilegeSet(String dbName, String tableNa String dbName) { List grants; List privileges = new ArrayList(); + boolean commit = false; + openTransaction(); try { Database db = getHBase().getDb(dbName); if (db == null) return privileges; @@ -866,9 +1032,12 @@ public PrincipalPrivilegeSet getColumnPrivilegeSet(String dbName, String tableNa privileges.add(new HiveObjectPrivilege(new HiveObjectRef(HiveObjectType.DATABASE, dbName, null, null, null), principalName, principalType, pgi)); } + commit = true; return privileges; } catch (IOException e) { throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); } } @@ -879,6 +1048,8 @@ public PrincipalPrivilegeSet getColumnPrivilegeSet(String dbName, String tableNa String tableName) { List grants; List privileges = new ArrayList(); + boolean commit = false; + openTransaction(); try { Table table = getHBase().getTable(dbName, tableName); if (table == null) return privileges; @@ -906,9 +1077,12 @@ public PrincipalPrivilegeSet getColumnPrivilegeSet(String dbName, String tableNa privileges.add(new HiveObjectPrivilege(new HiveObjectRef(HiveObjectType.TABLE, dbName, tableName, null, null), principalName, principalType, pgi)); } + commit = true; return privileges; } catch (IOException e) { throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); } } @@ -947,40 +1121,55 @@ public PrincipalPrivilegeSet getColumnPrivilegeSet(String dbName, String tableNa @Override public boolean grantPrivileges(PrivilegeBag privileges) throws InvalidObjectException, MetaException, NoSuchObjectException { - for (HiveObjectPrivilege priv : privileges.getPrivileges()) { - // Locate the right object to deal with - PrivilegeInfo privilegeInfo = findPrivilegeToGrantOrRevoke(priv); - - // Now, let's see if we've already got this privilege - for (PrivilegeGrantInfo info : privilegeInfo.grants) { - if (info.getPrivilege().equals(priv.getGrantInfo().getPrivilege())) { - throw new InvalidObjectException(priv.getPrincipalName() + " already has " + - priv.getGrantInfo().getPrivilege() + " on " + privilegeInfo.typeErrMsg); + boolean commit = false; + openTransaction(); + try { + for (HiveObjectPrivilege priv : privileges.getPrivileges()) { + // Locate the right object to deal with + PrivilegeInfo privilegeInfo = findPrivilegeToGrantOrRevoke(priv); + + // Now, let's see if we've already got this privilege + for (PrivilegeGrantInfo info : privilegeInfo.grants) { + if (info.getPrivilege().equals(priv.getGrantInfo().getPrivilege())) { + throw new InvalidObjectException(priv.getPrincipalName() + " already has " + + priv.getGrantInfo().getPrivilege() + " on " + privilegeInfo.typeErrMsg); + } } - } - privilegeInfo.grants.add(priv.getGrantInfo()); + privilegeInfo.grants.add(priv.getGrantInfo()); - writeBackGrantOrRevoke(priv, privilegeInfo); + writeBackGrantOrRevoke(priv, privilegeInfo); + } + commit = true; + return true; + } finally { + commitOrRoleBack(commit); } - return true; } @Override public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption) throws InvalidObjectException, MetaException, NoSuchObjectException { - for (HiveObjectPrivilege priv : privileges.getPrivileges()) { - PrivilegeInfo privilegeInfo = findPrivilegeToGrantOrRevoke(priv); - - for (int i = 0; i < privilegeInfo.grants.size(); i++) { - if (privilegeInfo.grants.get(i).getPrivilege().equals(priv.getGrantInfo().getPrivilege())) { - if (grantOption) privilegeInfo.grants.get(i).setGrantOption(false); - else privilegeInfo.grants.remove(i); - break; + boolean commit = false; + openTransaction(); + try { + for (HiveObjectPrivilege priv : privileges.getPrivileges()) { + PrivilegeInfo privilegeInfo = findPrivilegeToGrantOrRevoke(priv); + + for (int i = 0; i < privilegeInfo.grants.size(); i++) { + if (privilegeInfo.grants.get(i).getPrivilege().equals( + priv.getGrantInfo().getPrivilege())) { + if (grantOption) privilegeInfo.grants.get(i).setGrantOption(false); + else privilegeInfo.grants.remove(i); + break; + } } + writeBackGrantOrRevoke(priv, privilegeInfo); } - writeBackGrantOrRevoke(priv, privilegeInfo); + commit = true; + return true; + } finally { + commitOrRoleBack(commit); } - return true; } private static class PrivilegeInfo { @@ -1103,48 +1292,67 @@ private void writeBackGrantOrRevoke(HiveObjectPrivilege priv, PrivilegeInfo pi) @Override public Role getRole(String roleName) throws NoSuchObjectException { + boolean commit = false; + openTransaction(); try { Role role = getHBase().getRole(roleName); if (role == null) { throw new NoSuchObjectException("Unable to find role " + roleName); } + commit = true; return role; } catch (IOException e) { LOG.error("Unable to get role", e); throw new NoSuchObjectException("Error reading table " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public List listRoleNames() { + boolean commit = false; + openTransaction(); try { List roles = getHBase().scanRoles(); List roleNames = new ArrayList(roles.size()); for (Role role : roles) roleNames.add(role.getRoleName()); + commit = true; return roleNames; } catch (IOException e) { throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); } } @Override public List listRoles(String principalName, PrincipalType principalType) { List roles = new ArrayList(); + boolean commit = false; + openTransaction(); try { - roles.addAll(getHBase().getPrincipalDirectRoles(principalName, principalType)); - } catch (IOException e) { - throw new RuntimeException(e); - } - // Add the public role if this is a user - if (principalType == PrincipalType.USER) { - roles.add(new Role(HiveMetaStore.PUBLIC, 0, null)); + try { + roles.addAll(getHBase().getPrincipalDirectRoles(principalName, principalType)); + } catch (IOException e) { + throw new RuntimeException(e); + } + // Add the public role if this is a user + if (principalType == PrincipalType.USER) { + roles.add(new Role(HiveMetaStore.PUBLIC, 0, null)); + } + commit = true; + return roles; + } finally { + commitOrRoleBack(commit); } - return roles; } @Override public List listRolesWithGrants(String principalName, PrincipalType principalType) { + boolean commit = false; + openTransaction(); try { List roles = listRoles(principalName, principalType); List rpgs = new ArrayList(roles.size()); @@ -1161,14 +1369,19 @@ public Role getRole(String roleName) throws NoSuchObjectException { } } } + commit = true; return rpgs; } catch (Exception e) { throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); } } @Override public List listRoleMembers(String roleName) { + boolean commit = false; + openTransaction(); try { HbaseMetastoreProto.RoleGrantInfoList gil = getHBase().getRolePrincipals(roleName); List roleMaps = new ArrayList(gil.getGrantInfoList().size()); @@ -1178,9 +1391,12 @@ public Role getRole(String roleName) throws NoSuchObjectException { giw.getGrantOption(), (int)giw.getAddTime(), giw.getGrantor(), HBaseUtils.convertPrincipalTypes(giw.getGrantorType()))); } + commit = true; return roleMaps; } catch (Exception e) { throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); } } @@ -1220,49 +1436,73 @@ public Partition getPartitionWithAuth(String dbName, String tblName, List groupNames) throws MetaException, NoSuchObjectException { // We don't handle auth info with partitions + boolean commit = false; + openTransaction(); try { - return getHBase().scanPartitions(db_name, tbl_name, part_vals, max_parts); + List parts = getHBase().scanPartitions(db_name, tbl_name, part_vals, max_parts); + commit = true; + return parts; } catch (IOException e) { LOG.error("Unable to list partition names", e); throw new MetaException("Failed to list part names, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override - public boolean updateTableColumnStatistics(ColumnStatistics colStats) - throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + public boolean updateTableColumnStatistics(ColumnStatistics colStats) throws + NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + boolean commit = false; + openTransaction(); try { getHBase().updateStatistics(colStats.getStatsDesc().getDbName(), colStats.getStatsDesc().getTableName(), null, null, colStats); + commit = true; return true; } catch (IOException e) { LOG.error("Unable to update column statistics", e); throw new MetaException("Failed to update column statistics, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override - public boolean updatePartitionColumnStatistics(ColumnStatistics statsObj, List partVals) - throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + public boolean updatePartitionColumnStatistics(ColumnStatistics statsObj, + List partVals) throws + NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + boolean commit = false; + openTransaction(); try { getHBase().updateStatistics(statsObj.getStatsDesc().getDbName(), - statsObj.getStatsDesc().getTableName(), statsObj.getStatsDesc().getPartName(), partVals, - statsObj); + statsObj.getStatsDesc().getTableName(), statsObj.getStatsDesc().getPartName(), + partVals, statsObj); + commit = true; return true; } catch (IOException e) { LOG.error("Unable to update column statistics", e); throw new MetaException("Failed to update column statistics, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public ColumnStatistics getTableColumnStatistics(String dbName, String tableName, - List colName) throws MetaException, NoSuchObjectException { + List colName) throws MetaException, + NoSuchObjectException { + boolean commit = false; + openTransaction(); try { - return getHBase().getTableStatistics(dbName, tableName, colName); + ColumnStatistics cs = getHBase().getTableStatistics(dbName, tableName, colName); + commit = true; + return cs; } catch (IOException e) { LOG.error("Unable to fetch column statistics", e); throw new MetaException("Failed to fetch column statistics, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -1273,11 +1513,19 @@ public ColumnStatistics getTableColumnStatistics(String dbName, String tableName for (String partName : partNames) { partVals.add(partNameToVals(partName)); } + for (String partName : partNames) partVals.add(partNameToVals(partName)); + boolean commit = false; + openTransaction(); try { - return getHBase().getPartitionStatistics(dbName, tblName, partNames, partVals, colNames); + List cs = + getHBase().getPartitionStatistics(dbName, tblName, partNames, partVals, colNames); + commit = true; + return cs; } catch (IOException e) { LOG.error("Unable to fetch column statistics", e); throw new MetaException("Failed fetching column statistics, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -1308,11 +1556,17 @@ public AggrStats get_aggr_stats_for(String dbName, String tblName, List for (String partName : partNames) { partVals.add(partNameToVals(partName)); } + boolean commit = false; + openTransaction(); try { - return getHBase().getAggrStats(dbName, tblName, partNames, partVals, colNames); + AggrStats stats = getHBase().getAggrStats(dbName, tblName, partNames, partVals, colNames); + commit = true; + return stats; } catch (IOException e) { LOG.error("Unable to fetch aggregate column statistics", e); throw new MetaException("Failed fetching aggregate column statistics, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -1364,14 +1618,7 @@ public boolean removeMasterKey(Integer keySeq) { @Override public void verifySchema() throws MetaException { - try { - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { - getHBase().createTablesIfNotExist(); - } - } catch (IOException e) { - LOG.fatal("Unable to verify schema ", e); - throw new MetaException("Unable to verify schema"); - } + } @Override @@ -1387,13 +1634,18 @@ public void setMetaStoreSchemaVersion(String version, String comment) throws Met @Override public void dropPartitions(String dbName, String tblName, List partNames) throws MetaException, NoSuchObjectException { + boolean commit = false; + openTransaction(); try { for (String partName : partNames) { dropPartition(dbName, tblName, partNameToVals(partName)); } + commit = true; } catch (Exception e) { LOG.error("Unable to drop partitions", e); throw new NoSuchObjectException("Failure dropping partitions, " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -1401,6 +1653,8 @@ public void dropPartitions(String dbName, String tblName, List partNames public List listPrincipalDBGrantsAll(String principalName, PrincipalType principalType) { List privileges = new ArrayList(); + boolean commit = false; + openTransaction(); try { List dbs = getHBase().scanDatabases(null); for (Database db : dbs) { @@ -1431,9 +1685,12 @@ public void dropPartitions(String dbName, String tblName, List partNames db.getName(), null, null, null), principalName, principalType, pgi)); } } + commit = true; return privileges; } catch (IOException e) { throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); } } @@ -1441,6 +1698,8 @@ public void dropPartitions(String dbName, String tblName, List partNames public List listPrincipalTableGrantsAll(String principalName, PrincipalType principalType) { List privileges = new ArrayList(); + boolean commit = false; + openTransaction(); try { List
tables = getHBase().scanTables(null, null); for (Table table : tables) { @@ -1472,9 +1731,12 @@ public void dropPartitions(String dbName, String tblName, List partNames pgi)); } } + commit = true; return privileges; } catch (IOException e) { throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); } } @@ -1499,6 +1761,8 @@ public void dropPartitions(String dbName, String tblName, List partNames @Override public List listGlobalGrantsAll() { List privileges = new ArrayList(); + boolean commit = false; + openTransaction(); try { PrincipalPrivilegeSet pps = getHBase().getGlobalPrivs(); if (pps != null) { @@ -1515,15 +1779,20 @@ public void dropPartitions(String dbName, String tblName, List partNames } } } + commit = true; return privileges; } catch (IOException e) { throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); } } @Override public List listDBGrantsAll(String dbName) { List privileges = new ArrayList(); + boolean commit = false; + openTransaction(); try { Database db = getHBase().getDb(dbName); PrincipalPrivilegeSet pps = db.getPrivileges(); @@ -1541,9 +1810,12 @@ public void dropPartitions(String dbName, String tblName, List partNames } } } + commit = true; return privileges; } catch (IOException e) { throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); } } @@ -1557,6 +1829,8 @@ public void dropPartitions(String dbName, String tblName, List partNames @Override public List listTableGrantsAll(String dbName, String tableName) { List privileges = new ArrayList(); + boolean commit = false; + openTransaction(); try { Table table = getHBase().getTable(dbName, tableName); PrincipalPrivilegeSet pps = table.getPrivileges(); @@ -1574,9 +1848,12 @@ public void dropPartitions(String dbName, String tblName, List partNames } } } + commit = true; return privileges; } catch (IOException e) { throw new RuntimeException(e); + } finally { + commitOrRoleBack(commit); } } @@ -1594,56 +1871,82 @@ public void dropPartitions(String dbName, String tblName, List partNames @Override public void createFunction(Function func) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); try { getHBase().putFunction(func); + commit = true; } catch (IOException e) { LOG.error("Unable to create function", e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public void alterFunction(String dbName, String funcName, Function newFunction) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); try { getHBase().putFunction(newFunction); + commit = true; } catch (IOException e) { LOG.error("Unable to alter function ", e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public void dropFunction(String dbName, String funcName) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { + boolean commit = false; + openTransaction(); try { getHBase().deleteFunction(dbName, funcName); + commit = true; } catch (IOException e) { LOG.error("Unable to delete function" + e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public Function getFunction(String dbName, String funcName) throws MetaException { + boolean commit = false; + openTransaction(); try { - return getHBase().getFunction(dbName, funcName); + Function func = getHBase().getFunction(dbName, funcName); + commit = true; + return func; } catch (IOException e) { LOG.error("Unable to get function" + e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @Override public List getFunctions(String dbName, String pattern) throws MetaException { + boolean commit = false; + openTransaction(); try { List funcs = getHBase().scanFunctions(dbName, likeToRegex(pattern)); List funcNames = new ArrayList(funcs.size()); for (Function func : funcs) funcNames.add(func.getFunctionName()); + commit = true; return funcNames; } catch (IOException e) { LOG.error("Unable to get functions" + e); throw new MetaException("Unable to read from or write to hbase " + e.getMessage()); + } finally { + commitOrRoleBack(commit); } } @@ -1769,4 +2072,14 @@ private String likeToRegex(String like) { // use it, even though it wouldn't work on RDBMS backed metastores. return like.replace("*", ".*"); } + + private void commitOrRoleBack(boolean commit) { + if (commit) { + LOG.debug("Committing transaction"); + commitTransaction(); + } else { + LOG.debug("Rolling back transaction"); + rollbackTransaction(); + } + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/TephraHBaseConnection.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/TephraHBaseConnection.java new file mode 100644 index 0000000..47c3f11 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/TephraHBaseConnection.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import co.cask.tephra.TransactionAware; +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionFailureException; +import co.cask.tephra.TransactionManager; +import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.distributed.ThreadLocalClientProvider; +import co.cask.tephra.distributed.TransactionServiceClient; +import co.cask.tephra.hbase98.TransactionAwareHTable; +import co.cask.tephra.hbase98.coprocessor.TransactionProcessor; +import co.cask.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.twill.discovery.InMemoryDiscoveryService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A class that uses Tephra for transaction management. + */ +public class TephraHBaseConnection extends VanillaHBaseConnection { + static final private Log LOG = LogFactory.getLog(TephraHBaseConnection.class.getName()); + + private Map txnTables; + private TransactionContext txn; + private TransactionSystemClient txnClient; + + TephraHBaseConnection() { + super(); + txnTables = new HashMap(); + } + + @Override + public void connect() throws IOException { + super.connect(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + LOG.debug("Using an in memory client transaction system for testing"); + TransactionManager txnMgr = new TransactionManager(conf); + txnMgr.startAndWait(); + txnClient = new InMemoryTxSystemClient(txnMgr); + } else { + // TODO should enable use of ZKDiscoveryService if users want it + LOG.debug("Using real client transaction system for production"); + txnClient = new TransactionServiceClient(conf, + new ThreadLocalClientProvider(conf, new InMemoryDiscoveryService())); + } + for (String tableName : HBaseReadWrite.tableNames) { + txnTables.put(tableName, new TransactionAwareHTable(super.getHBaseTable(tableName, true))); + } + txn = new TransactionContext(txnClient, txnTables.values()); + } + + @Override + public void beginTransaction() throws IOException { + try { + txn.start(); + LOG.debug("Started txn in tephra"); + } catch (TransactionFailureException e) { + throw new IOException(e); + } + } + + @Override + public void commitTransaction() throws IOException { + try { + txn.finish(); + LOG.debug("Finished txn in tephra"); + } catch (TransactionFailureException e) { + throw new IOException(e); + } + } + + @Override + public void rollbackTransaction() throws IOException { + try { + txn.abort(); + LOG.debug("Aborted txn in tephra"); + } catch (TransactionFailureException e) { + throw new IOException(e); + } + } + + @Override + public void flush(HTableInterface htab) throws IOException { + // NO-OP as we want to flush at commit time + } + + @Override + protected HTableDescriptor buildDescriptor(String tableName, List columnFamilies) + throws IOException { + HTableDescriptor tableDesc = super.buildDescriptor(tableName, columnFamilies); + tableDesc.addCoprocessor(TransactionProcessor.class.getName()); + return tableDesc; + } + + @Override + public HTableInterface getHBaseTable(String tableName, boolean force) throws IOException { + // Ignore force, it will mess up our previous creation of the tables. + return (TransactionAwareHTable)txnTables.get(tableName); + } + +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/VanillaHBaseConnection.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/VanillaHBaseConnection.java index 3f3f4a7..25334a3 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/hbase/VanillaHBaseConnection.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/VanillaHBaseConnection.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.hadoop.hive.metastore.hbase; import org.apache.commons.logging.Log; @@ -24,17 +42,16 @@ public class VanillaHBaseConnection implements HBaseConnection { static final private Log LOG = LogFactory.getLog(VanillaHBaseConnection.class.getName()); - - private HConnection conn; - private Map tables; - Configuration conf; + protected HConnection conn; + protected Map tables; + protected Configuration conf; VanillaHBaseConnection() { + tables = new HashMap(); } @Override public void connect() throws IOException { - tables = new HashMap(); if (conf == null) throw new RuntimeException("Must call getConf before connect"); conn = HConnectionManager.createConnection(conf); } @@ -60,16 +77,26 @@ public void rollbackTransaction() throws IOException { } @Override - public void createHBaseTable(String tableName, List columnFamilies) throws - IOException { + public void flush(HTableInterface htab) throws IOException { + htab.flushCommits(); + } + + @Override + public void createHBaseTable(String tableName, List columnFamilies) + throws IOException { HBaseAdmin admin = new HBaseAdmin(conn); LOG.info("Creating HBase table " + tableName); + admin.createTable(buildDescriptor(tableName, columnFamilies)); + admin.close(); + } + + protected HTableDescriptor buildDescriptor(String tableName, List columnFamilies) + throws IOException { HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); for (byte[] cf : columnFamilies) { tableDesc.addFamily(new HColumnDescriptor(cf)); } - admin.createTable(tableDesc); - admin.close(); + return tableDesc; } @Override diff --git pom.xml pom.xml index c147d45..e679006 100644 --- pom.xml +++ pom.xml @@ -167,6 +167,7 @@ 2.4.0 2.6.0 3.0.0 + 0.4.0