diff --git bin/ext/hbaseimport.cmd bin/ext/hbaseimport.cmd new file mode 100644 index 0000000..ff69007 --- /dev/null +++ bin/ext/hbaseimport.cmd @@ -0,0 +1,35 @@ +@echo off +@rem Licensed to the Apache Software Foundation (ASF) under one or more +@rem contributor license agreements. See the NOTICE file distributed with +@rem this work for additional information regarding copyright ownership. +@rem The ASF licenses this file to You under the Apache License, Version 2.0 +@rem (the "License"); you may not use this file except in compliance with +@rem the License. You may obtain a copy of the License at +@rem +@rem http://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. + +set CLASS=org.apache.hadoop.hive.metastore.hbase.HBaseImport +set HIVE_OPTS= +set HADOOP_CLASSPATH= + +pushd %HIVE_LIB% +for /f %%a IN ('dir /b hive-exec-*.jar') do ( + set JAR=%HIVE_LIB%\%%a +) +popd + +if [%1]==[hbaseimport_help] goto :hbaseimport_help + +:hbaseimport + call %HIVE_BIN_PATH%\ext\util\execHiveCmd.cmd %CLASS% +goto :EOF + +:hbaseimport_help + echo "usage hive --hbaseimport" +goto :EOF diff --git bin/ext/hbaseimport.sh bin/ext/hbaseimport.sh new file mode 100644 index 0000000..638cdcf --- /dev/null +++ bin/ext/hbaseimport.sh @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +THISSERVICE=hbaseimport +export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} " + +hbaseimport () { + CLASS=org.apache.hadoop.hive.metastore.hbase.HBaseImport + HIVE_OPTS='' + execHiveCmd $CLASS "$@" +} + +hbaseimport_help () { + echo "usage ./hive hbaseimport" +} diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java new file mode 100644 index 0000000..e8225da --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Test that import from an RDBMS based metastore works + */ +public class TestHBaseImport { + + private static final Log LOG = LogFactory.getLog(TestHBaseStoreIntegration.class.getName()); + + private static HBaseTestingUtility utility; + private static HTableInterface tblTable; + private static HTableInterface sdTable; + private static HTableInterface partTable; + private static HTableInterface dbTable; + private static HTableInterface roleTable; + private static Map emptyParameters = new HashMap(); + + @Rule public ExpectedException thrown = ExpectedException.none(); + @Mock private HConnection hconn; + private HBaseStore store; + private HiveConf conf; + + @BeforeClass + public static void startMiniCluster() throws Exception { + utility = new HBaseTestingUtility(); + utility.startMiniCluster(); + byte[][] families = new byte[][] {HBaseReadWrite.CATALOG_CF, HBaseReadWrite.STATS_CF}; + tblTable = utility.createTable(HBaseReadWrite.TABLE_TABLE.getBytes(HBaseUtils.ENCODING), + families); + sdTable = utility.createTable(HBaseReadWrite.SD_TABLE.getBytes(HBaseUtils.ENCODING), + HBaseReadWrite.CATALOG_CF); + partTable = utility.createTable(HBaseReadWrite.PART_TABLE.getBytes(HBaseUtils.ENCODING), + families); + dbTable = utility.createTable(HBaseReadWrite.DB_TABLE.getBytes(HBaseUtils.ENCODING), + HBaseReadWrite.CATALOG_CF); + roleTable = utility.createTable(HBaseReadWrite.ROLE_TABLE.getBytes(HBaseUtils.ENCODING), + HBaseReadWrite.CATALOG_CF); + } + + @AfterClass + public static void shutdownMiniCluster() throws Exception { + utility.shutdownMiniCluster(); + } + + @Before + public void setupConnection() throws IOException { + MockitoAnnotations.initMocks(this); + Mockito.when(hconn.getTable(HBaseReadWrite.SD_TABLE)).thenReturn(sdTable); + Mockito.when(hconn.getTable(HBaseReadWrite.TABLE_TABLE)).thenReturn(tblTable); + Mockito.when(hconn.getTable(HBaseReadWrite.PART_TABLE)).thenReturn(partTable); + Mockito.when(hconn.getTable(HBaseReadWrite.DB_TABLE)).thenReturn(dbTable); + Mockito.when(hconn.getTable(HBaseReadWrite.ROLE_TABLE)).thenReturn(roleTable); + conf = new HiveConf(); + // Turn off caching, as we want to test actual interaction with HBase + conf.setBoolean(HBaseReadWrite.NO_CACHE_CONF, true); + HBaseReadWrite hbase = HBaseReadWrite.getInstance(conf); + hbase.setConnection(hconn); + store = new HBaseStore(); + store.setConf(conf); + } + + @Test + public void doImport() throws Exception { + RawStore rdbms = new ObjectStore(); + rdbms.setConf(conf); + + String[] dbNames = new String[] {"importdb1", "importdb2"}; + String[] tableNames = new String[] {"nonparttable", "parttable"}; + String[] partVals = new String[] {"na", "emea", "latam", "apac"}; + String[] roles = new String[] {"role1", "role2"}; + int now = (int)System.currentTimeMillis() / 1000; + + for (int i = 0; i < roles.length; i++) { + rdbms.addRole(roles[i], "me"); + } + + for (int i = 0; i < dbNames.length; i++) { + rdbms.createDatabase(new Database(dbNames[i], "no description", "file:/tmp", emptyParameters)); + + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, emptyParameters); + rdbms.createTable(new Table(tableNames[0], dbNames[i], "me", now, now, 0, sd, null, + emptyParameters, null, null, null)); + + List partCols = new ArrayList(); + partCols.add(new FieldSchema("region", "string", "")); + rdbms.createTable(new Table(tableNames[1], dbNames[i], "me", now, now, 0, sd, partCols, + emptyParameters, null, null, null)); + + for (int j = 0; j < partVals.length; j++) { + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/region=" + partVals[j]); + Partition part = new Partition(Arrays.asList(partVals[j]), dbNames[i], tableNames[1], + now, now, psd, emptyParameters); + store.addPartition(part); + } + } + + HBaseImport importer = new HBaseImport(); + importer.setConnections(rdbms, store); + importer.run(); + + for (int i = 0; i < roles.length; i++) { + Role role = store.getRole(roles[i]); + Assert.assertNotNull(role); + Assert.assertEquals(roles[i], role.getRoleName()); + } + // Make sure there aren't any extra roles + Assert.assertEquals(2, store.listRoleNames().size()); + + for (int i = 0; i < dbNames.length; i++) { + Database db = store.getDatabase(dbNames[i]); + Assert.assertNotNull(db); + // check one random value in the db rather than every value + Assert.assertEquals("file:/tmp", db.getLocationUri()); + + Table table = store.getTable(db.getName(), tableNames[0]); + Assert.assertNotNull(table); + Assert.assertEquals(now, table.getLastAccessTime()); + Assert.assertEquals("input", table.getSd().getInputFormat()); + + table = store.getTable(db.getName(), tableNames[1]); + Assert.assertNotNull(table); + + for (int j = 0; j < partVals.length; j++) { + Partition part = store.getPartition(dbNames[i], tableNames[1], Arrays.asList(partVals[j])); + Assert.assertNotNull(part); + Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation()); + } + + Assert.assertEquals(4, store.getPartitions(dbNames[i], tableNames[1], -1).size()); + Assert.assertEquals(2, store.getAllTables(dbNames[i]).size()); + + + } + + Assert.assertEquals(2, store.getAllDatabases().size()); + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java new file mode 100644 index 0000000..824c653 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.Table; + +import java.util.ArrayList; +import java.util.List; + +/** + * A tool to take the contents of an RDBMS based Hive metastore and import it into an HBase based + * one. To use this the config files for Hive configured to work with the RDBMS (that is, + * including the JDBC string, etc.) and for HBase must be in the path. This tool will then + * handle connecting to the RDBMS via the {@link org.apache.hadoop.hive.metastore.ObjectStore} + * and HBase via {@link org.apache.hadoop.hive.metastore.hbase.HBaseStore} and transferring the + * data. + */ +public class HBaseImport { + + static final private Log LOG = LogFactory.getLog(HBaseImport.class.getName()); + + public static void main(String[] args) { + HBaseImport tool = new HBaseImport(); + try { + tool.run(); + } catch (Exception e) { + System.err.println("Caught exception " + e.getClass().getName() + " with message <" + + e.getMessage() + ">"); + } + } + + private Configuration rdbmsConf; + private Configuration hbaseConf; + private RawStore rdbmsStore; + private RawStore hbaseStore; + private List dbs; + private List tables; + + @VisibleForTesting + HBaseImport() { + dbs = new ArrayList(); + tables = new ArrayList
(); + + } + + @VisibleForTesting + void run() throws MetaException, InstantiationException, IllegalAccessException, + NoSuchObjectException, InvalidObjectException { + init(); + copyRoles(); + copyDbs(); + copyTables(); + copyPartitions(); + } + + private void init() throws MetaException, IllegalAccessException, InstantiationException { + if (rdbmsStore != null) { + // We've been configured for testing, so don't do anything here. + return; + } + rdbmsConf = new Configuration(); // We're depending on having everything properly in the path + hbaseConf = new Configuration(); + HiveConf.setVar(hbaseConf, HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, + HBaseStore.class.getName()); + HiveConf.setBoolVar(hbaseConf, HiveConf.ConfVars.METASTORE_FASTPATH, true); + + // First get a connection to the RDBMS based store + rdbmsStore = new ObjectStore(); + rdbmsStore.setConf(rdbmsConf); + + // Get a connection to the HBase based store + hbaseStore = new HBaseStore(); + hbaseStore.setConf(hbaseConf); + } + + private void copyRoles() throws NoSuchObjectException, InvalidObjectException, MetaException { + screen("Copying roles"); + for (String roleName : rdbmsStore.listRoleNames()) { + Role role = rdbmsStore.getRole(roleName); + screen("Copying role " + roleName); + hbaseStore.addRole(roleName, role.getOwnerName()); + } + } + + private void copyDbs() throws MetaException, NoSuchObjectException, InvalidObjectException { + screen("Copying databases"); + for (String dbName : rdbmsStore.getAllDatabases()) { + Database db = rdbmsStore.getDatabase(dbName); + dbs.add(db); + screen("Copying database " + dbName); + hbaseStore.createDatabase(db); + } + } + + private void copyTables() throws MetaException, InvalidObjectException { + screen("Copying tables"); + for (Database db : dbs) { + screen("Coyping tables in database " + db.getName()); + for (String tableName : rdbmsStore.getAllTables(db.getName())) { + Table table = rdbmsStore.getTable(db.getName(), tableName); + tables.add(table); + screen("Copying table " + db.getName() + "." + tableName); + hbaseStore.createTable(table); + } + } + } + + private void copyPartitions() throws MetaException, NoSuchObjectException, + InvalidObjectException { + screen("Copying partitions"); + for (Table table : tables) { + System.out.print("Copying partitions for table " + table.getDbName() + "." + + table.getTableName()); + for (Partition part : rdbmsStore.getPartitions(table.getDbName(), table.getTableName(), -1)) { + LOG.info("Copying " + table.getTableName() + "." + table.getTableName() + "." + + StringUtils.join(part.getValues(), ':')); + System.out.print('.'); + hbaseStore.addPartition(part); + } + System.out.println(); + } + } + + private void screen(String msg) { + LOG.info(msg); + System.out.println(msg); + } + + @VisibleForTesting + HBaseImport setConnections(RawStore rdbms, RawStore hbase) { + rdbmsStore = rdbms; + hbaseStore = hbase; + + return new HBaseImport(); + } + +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index cc49d77..c3990b4 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -437,6 +437,22 @@ Role getRole(String roleName) throws IOException { } /** + * Get a list of roles. + * @return list of all known roles. + * @throws IOException + */ + List scanRoles() throws IOException { + Iterator iter = scanWithFilter(ROLE_TABLE, null, CATALOG_CF, CATALOG_COL, null); + List roles = new ArrayList(); + while (iter.hasNext()) { + RoleWritable role = new RoleWritable(); + HBaseUtils.deserialize(role, iter.next().getValue(CATALOG_CF, CATALOG_COL)); + roles.add(role.role); + } + return roles; + } + + /** * Add a new role * @param role role object * @throws IOException diff --git metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 6844e14..a4a88e4 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -622,7 +622,14 @@ public Role getRole(String roleName) throws NoSuchObjectException { @Override public List listRoleNames() { - throw new UnsupportedOperationException(); + try { + List roles = getHBase().scanRoles(); + List roleNames = new ArrayList(roles.size()); + for (Role role : roles) roleNames.add(role.getRoleName()); + return roleNames; + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override