diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/MetastoreSchemaTool.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/MetastoreSchemaTool.java index b58b0f0d89..223b3cb362 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/MetastoreSchemaTool.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/MetastoreSchemaTool.java @@ -442,6 +442,8 @@ public int run(String metastoreHome, String[] args, OptionGroup additionalOption task = new SchemaToolTaskCreateCatalog(); } else if (cmdLine.hasOption("alterCatalog")) { task = new SchemaToolTaskAlterCatalog(); + } else if (cmdLine.hasOption("mergeCatalog")) { + task = new SchemaToolTaskMergeCatalog(); } else if (cmdLine.hasOption("moveDatabase")) { task = new SchemaToolTaskMoveDatabase(); } else if (cmdLine.hasOption("moveTable")) { diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/SchemaToolCommandLine.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/SchemaToolCommandLine.java index a231df51e0..a1d9228f60 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/SchemaToolCommandLine.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/SchemaToolCommandLine.java @@ -59,6 +59,11 @@ private Options createOptions(OptionGroup additionalOptions) { .hasArg() .withDescription("Alter a catalog, requires --catalogLocation and/or --catalogDescription parameter as well") .create("alterCatalog"); + Option mergeCatalog = OptionBuilder + .hasArg() + .withDescription("Merge databases from a catalog into other, Argument is the source catalog name " + + "Requires --toCatalog to indicate the destination catalog") + .create("mergeCatalog"); Option moveDatabase = OptionBuilder .hasArg() .withDescription("Move a database between catalogs. Argument is the database name. " + @@ -90,6 +95,7 @@ private Options createOptions(OptionGroup additionalOptions) { .addOption(validateOpt) .addOption(createCatalog) .addOption(alterCatalog) + .addOption(mergeCatalog) .addOption(moveDatabase) .addOption(moveTable) .addOption(createUserOpt) @@ -270,6 +276,11 @@ private void validate() throws ParseException { printAndExit("ifNotExists may be set only for createCatalog"); } + if (cl.hasOption("mergeCatalog") && + (!cl.hasOption("toCatalog"))) { + printAndExit("mergeCatalog and toCatalog must be set for mergeCatalog"); + } + if (cl.hasOption("moveDatabase") && (!cl.hasOption("fromCatalog") || !cl.hasOption("toCatalog"))) { printAndExit("fromCatalog and toCatalog must be set for moveDatabase"); @@ -281,7 +292,7 @@ private void validate() throws ParseException { printAndExit("fromCatalog, toCatalog, fromDatabase and toDatabase must be set for moveTable"); } - if ((!cl.hasOption("moveDatabase") && !cl.hasOption("moveTable")) && + if ((!cl.hasOption("moveDatabase") && !cl.hasOption("moveTable") && !cl.hasOption("mergeCatalog")) && (cl.hasOption("fromCatalog") || cl.hasOption("toCatalog"))) { printAndExit("fromCatalog and toCatalog may be set only for moveDatabase and moveTable"); } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/SchemaToolTaskMergeCatalog.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/SchemaToolTaskMergeCatalog.java new file mode 100644 index 0000000000..3a2f7216cd --- /dev/null +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/schematool/SchemaToolTaskMergeCatalog.java @@ -0,0 +1,156 @@ +package org.apache.hadoop.hive.metastore.tools.schematool; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.hadoop.hive.metastore.HiveMetaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; + +public class SchemaToolTaskMergeCatalog extends SchemaToolTask { + private static final Logger + LOG = LoggerFactory.getLogger(org.apache.hadoop.hive.metastore.tools.schematool.SchemaToolTaskMergeCatalog.class.getName()); + + private String fromCatalog; + private String toCatalog; + + @Override + void setCommandLineArguments(SchemaToolCommandLine cl) { + fromCatalog = normalizeIdentifier(cl.getOptionValue("mergeCatalog")); + toCatalog = cl.getOptionValue("toCatalog"); + } + + private static final String DB_CONFLICTS_STMT = + "SELECT d.NAME as DB, d.CTLG_NAME, d2.CTLG_NAME FROM DBS d, DBS d2 " + + "WHERE d.NAME = d2.NAME AND " + + "d.CTLG_NAME = '%s' AND d2.CTLG_NAME = '%s'"; + + private static final String MERGE_CATALOG_STMT = + "UPDATE DBS " + + " SET CTLG_NAME = '%s' " + " WHERE CTLG_NAME = '%s'"; + + private static final String CONVERT_TABLE_TO_EXTERNAL = + "update TBLS set TBL_TYPE = '%s' where TBL_ID in (" + + "select tid from (select TBL_ID as tid from TBLS t2, DBS d where t2.TBL_TYPE = '%s' and t2.DB_ID = d.DB_ID " + + "and d.CTLG_NAME = '%s') c) "; + + private static final String UPDATE_CTLG_NAME_ON_DBS = + "update DBS d set d.CTLG_NAME = '%s' WHERE d.CTLG_NAME = '%s' "; + + private static final String ADD_PARAM_TO_TABLE = + "INSERT INTO TABLE_PARAMS (TBL_ID, PARAM_KEY, PARAM_VALUE) select TBL_ID, " + + "'%s', '%s' from TBLS where TBL_TYPE = '%s' "; + + private static final String ADD_AUTOPURGE_TO_TABLE = + "INSERT INTO TABLE_PARAMS (TBL_ID, PARAM_KEY, PARAM_VALUE) select TBL_ID, " + + "'%s', '%s' from TBLS t, DBS d, CTLGS c " + + "where TBL_TYPE = '%s' and t.DB_ID = d.DB_ID and d.CTLG_NAME = c.NAME and c.NAME = '%s' "; + + @Override + void execute() throws HiveMetaException { + if (fromCatalog == null || toCatalog == null) { + throw new HiveMetaException("Merge catalog requires --mergeCatalog and --toCatalog arguments"); + } + System.out.println("Merging databases from " + fromCatalog + " to " + toCatalog); + + Connection conn = schemaTool.getConnectionToMetastore(true); + boolean success = false; + long initTime, prevTime, curTime; + + try { + // determine conflicts between catalogs first + try (Statement stmt = conn.createStatement()) { + initTime = System.currentTimeMillis(); + // TODO ensure both catalogs exist first. + + // Detect conflicting databases + String conflicts = String.format(schemaTool.quote(DB_CONFLICTS_STMT), fromCatalog, toCatalog); + System.out.println("Determining name conflicts between databases across catalogs"); + LOG.info("[DB Conflicts] Executing SQL:" + conflicts); + ResultSet rs = stmt.executeQuery(conflicts); + boolean cleanMerge = true; + while (rs.next()) { + cleanMerge = false; + System.out.println( + "Name conflict(s) between merging catalogs, database " + rs.getString(1) + " exists in catalogs " + + rs.getString(2) + " and " + rs.getString(3)); + } + + if (!cleanMerge) { + System.out.println("[ERROR] Please resolve the database name conflicts shown above manually and retry the mergeCatalog operation."); + System.exit(1); + } + + conn.setAutoCommit(false); + String insert = + String.format(schemaTool.quote(ADD_AUTOPURGE_TO_TABLE), "EXTERNAL", "TRUE", "MANAGED_TABLE", fromCatalog); + System.out.println("Setting external=true on all MANAGED tables in catalog " + fromCatalog); + LOG.debug("[external table property] Executing SQL:" + insert); + prevTime = System.currentTimeMillis(); + int count = stmt.executeUpdate(insert); + curTime = System.currentTimeMillis(); + System.out.println("Set external.table.purge on " + count + " tables, time taken (ms):" + (curTime - prevTime)); + + insert = String.format(schemaTool.quote(ADD_AUTOPURGE_TO_TABLE), "external.table.purge", "true", "MANAGED_TABLE", + fromCatalog); + System.out.println("Setting external.table.purge=true on all MANAGED tables in catalog " + fromCatalog); + LOG.debug("[external.table.purge] Executing SQL:" + insert); + prevTime = curTime; + count = stmt.executeUpdate(insert); + curTime = System.currentTimeMillis(); + System.out.println("Set external.table.purge on " + count + " tables, time taken (ms):" + (curTime - prevTime)); + + String update = + String.format(schemaTool.quote(CONVERT_TABLE_TO_EXTERNAL), "EXTERNAL_TABLE", "MANAGED_TABLE", fromCatalog); + System.out.println("Setting tableType to EXTERNAL on all MANAGED tables in catalog " + fromCatalog); + LOG.debug("[tableType=EXTERNAL_TABLE] Executing SQL:" + update); + prevTime = curTime; + count = stmt.executeUpdate(update); + curTime = System.currentTimeMillis(); + System.out.println("Set tableType=EXTERNAL_TABLE on " + count + " tables, time taken (ms):" + (curTime - prevTime)); + + String merge = String.format(schemaTool.quote(MERGE_CATALOG_STMT), toCatalog, fromCatalog); + System.out.println("Setting catalog names on all databases in catalog " + fromCatalog); + LOG.debug("[catalog name] Executing SQL:" + merge); + prevTime = curTime; + count = stmt.executeUpdate(merge); + curTime = System.currentTimeMillis(); + System.out.println("Changed catalog names on " + count + " databases, time taken (ms):" + (curTime - prevTime)); + + if (count == 0) { + LOG.info(count + " databases have been merged from catalog " + fromCatalog + " into " + toCatalog); + } + if (schemaTool.isDryRun()) { + conn.rollback(); + success = true; + } else { + conn.commit(); + System.out.println("Committed the changes. Total time taken (ms):" + (curTime - initTime)); + success = true; + } + } + } catch (SQLException e) { + throw new HiveMetaException("Failed to merge catalog", e); + } finally { + try { + if (!success) { + System.out.println("Rolling back transaction"); + conn.rollback(); + } + conn.close(); + } catch (SQLException e) { + // Not really much we can do here. + LOG.error("Failed to rollback, everything will probably go bad from here.", e); + try { + conn.close(); + } catch (SQLException ex) { + LOG.warn("Failed to close connection.", ex); + } + } + } + } +}