diff --git a/beeline/src/java/org/apache/hive/beeline/HiveSchemaHelper.java b/beeline/src/java/org/apache/hive/beeline/HiveSchemaHelper.java index 181f0d2..6e033fc 100644 --- a/beeline/src/java/org/apache/hive/beeline/HiveSchemaHelper.java +++ b/beeline/src/java/org/apache/hive/beeline/HiveSchemaHelper.java @@ -29,6 +29,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.ArrayList; import java.util.IllegalFormatException; import java.util.List; @@ -158,9 +159,9 @@ public static String getValidConfVar(HiveConf.ConfVars confVar, HiveConf hiveCon * * @param scriptDir upgrade script directory * @param scriptFile upgrade script file - * @return string of sql commands + * @return the list of sql commands */ - public String buildCommand(String scriptDir, String scriptFile) + public List buildCommands(String scriptDir, String scriptFile) throws IllegalFormatException, IOException; } @@ -222,12 +223,12 @@ public boolean needsQuotedIdentifier() { } @Override - public String buildCommand( + public List buildCommands( String scriptDir, String scriptFile) throws IllegalFormatException, IOException { BufferedReader bfReader = new BufferedReader(new FileReader(scriptDir + File.separatorChar + scriptFile)); String currLine; - StringBuilder sb = new StringBuilder(); + List commands = new ArrayList(); String currentCommand = null; while ((currLine = bfReader.readLine()) != null) { currLine = currLine.trim(); @@ -251,18 +252,17 @@ public String buildCommand( if (isNestedScript(currentCommand)) { // if this is a nested sql script then flatten it String currScript = getScriptName(currentCommand); - sb.append(buildCommand(scriptDir, currScript)); + commands.addAll(buildCommands(scriptDir, currScript)); } else { // Now we have a complete statement, process it // write the line to buffer - sb.append(currentCommand); - sb.append(System.getProperty("line.separator")); + commands.add(currentCommand); } } currentCommand = null; } bfReader.close(); - return sb.toString(); + return commands; } private void setDbOpts(String dbOpts) { diff --git a/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java b/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java index cd36ddf..b744e6f 100644 --- a/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java +++ b/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java @@ -26,8 +26,6 @@ import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.apache.commons.io.output.NullOutputStream; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaException; @@ -38,18 +36,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; import java.io.IOException; -import java.io.PrintStream; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.ArrayList; import java.util.List; public class HiveSchemaTool { @@ -117,8 +109,14 @@ private static void printAndExit(Options cmdLineOptions) { private Connection getConnectionToMetastore(boolean printInfo) throws HiveMetaException { - return HiveSchemaHelper.getConnectionToMetastore(userName, - passWord, printInfo, hiveConf); + try { + Connection conn = HiveSchemaHelper.getConnectionToMetastore(userName, + passWord, printInfo, hiveConf); + conn.setAutoCommit(false); + return conn; + } catch(SQLException e) { + throw new HiveMetaException(e); + } } private NestedScriptParser getDbCommandParser(String dbType) { @@ -128,16 +126,29 @@ private NestedScriptParser getDbCommandParser(String dbType) { /*** * Print Hive version and schema version - * @throws MetaException + * @throws HiveMetaException */ public void showInfo() throws HiveMetaException { - Connection metastoreConn = getConnectionToMetastore(true); - String hiveVersion = MetaStoreSchemaInfo.getHiveSchemaVersion(); - String dbVersion = getMetaStoreSchemaVersion(metastoreConn); - System.out.println("Hive distribution version:\t " + hiveVersion); - System.out.println("Metastore schema version:\t " + dbVersion); - assertCompatibleVersion(hiveVersion, dbVersion); - + Connection conn = null; + try { + conn = getConnectionToMetastore(true); + String hiveVersion = MetaStoreSchemaInfo.getHiveSchemaVersion(); + String dbVersion = getMetaStoreSchemaVersion(conn); + System.out.println("Hive distribution version:\t " + hiveVersion); + System.out.println("Metastore schema version:\t " + dbVersion); + assertCompatibleVersion(hiveVersion, dbVersion); + } finally { + try { + if (conn != null) { + if(dryRun) { + conn.rollback(); + } + conn.close(); + } + } catch (SQLException e) { + System.err.println("Failed to close the metastore connection:" + e.getMessage()); + } + } } // read schema version from metastore @@ -149,51 +160,40 @@ private String getMetaStoreSchemaVersion(Connection metastoreConn) } else { versionQuery = "select t.SCHEMA_VERSION from VERSION t"; } - try(Statement stmt = metastoreConn.createStatement(); - ResultSet res = stmt.executeQuery(versionQuery)) { + + Statement stmt = null; + try{ + stmt = metastoreConn.createStatement(); + ResultSet res = stmt.executeQuery(versionQuery); if (!res.next()) { throw new HiveMetaException("Didn't find version data in metastore"); } String currentSchemaVersion = res.getString(1); + if (!dryRun) { + metastoreConn.commit(); + } return currentSchemaVersion; } catch (SQLException e) { throw new HiveMetaException("Failed to get schema version.", e); - } - finally { - try { - metastoreConn.close(); - } catch (SQLException e) { - System.err.println("Failed to close the metastore connection"); - e.printStackTrace(System.err); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch(SQLException e) { + System.err.println("Failed to close statement:" + e.getMessage()); + } } } } - // test the connection metastore using the config property - private void testConnectionToMetastore() throws HiveMetaException { - Connection conn = getConnectionToMetastore(true); - try { - conn.close(); - } catch (SQLException e) { - throw new HiveMetaException("Failed to close metastore connection", e); - } - } - - /** * check if the current schema version in metastore matches the Hive version * @throws MetaException */ - public void verifySchemaVersion() throws HiveMetaException { - // don't check version if its a dry run - if (dryRun) { - return; - } - String newSchemaVersion = getMetaStoreSchemaVersion( - getConnectionToMetastore(false)); + public void verifySchemaVersion(Connection conn) throws HiveMetaException { + String newSchemaVersion = getMetaStoreSchemaVersion(conn); // verify that the new version is added to schema assertCompatibleVersion(MetaStoreSchemaInfo.getHiveSchemaVersion(), newSchemaVersion); - } private void assertCompatibleVersion(String hiveSchemaVersion, String dbSchemaVersion) @@ -205,54 +205,68 @@ private void assertCompatibleVersion(String hiveSchemaVersion, String dbSchemaVe } /** - * Perform metastore schema upgrade. extract the current schema version from metastore - * @throws MetaException + * Perform metastore schema upgrade. Extract the current schema version from metastore + * @throws HiveMetaException */ public void doUpgrade() throws HiveMetaException { - String fromVersion = getMetaStoreSchemaVersion( - getConnectionToMetastore(false)); - if (fromVersion == null || fromVersion.isEmpty()) { - throw new HiveMetaException("Schema version not stored in the metastore. " + - "Metastore schema is too old or corrupt. Try specifying the version manually"); - } - doUpgrade(fromVersion); + doUpgrade(null); } /** * Perform metastore schema upgrade * - * @param fromSchemaVer - * Existing version of the metastore. If null, then read from the metastore - * @throws MetaException + * @param fromSchemaVer Existing version of the metastore. If null, then read from the metastore. + * @throws HiveMetaException */ - public void doUpgrade(String fromSchemaVer) throws HiveMetaException { - if (MetaStoreSchemaInfo.getHiveSchemaVersion().equals(fromSchemaVer)) { - System.out.println("No schema upgrade required from version " + fromSchemaVer); - return; - } - // Find the list of scripts to execute for this upgrade - List upgradeScripts = - metaStoreSchemaInfo.getUpgradeScripts(fromSchemaVer); - testConnectionToMetastore(); - System.out.println("Starting upgrade metastore schema from version " + - fromSchemaVer + " to " + MetaStoreSchemaInfo.getHiveSchemaVersion()); - String scriptDir = metaStoreSchemaInfo.getMetaStoreScriptDir(); + private void doUpgrade(String fromSchemaVer) throws HiveMetaException { + Connection conn = null; try { + conn = getConnectionToMetastore(true); + if (fromSchemaVer == null) { + fromSchemaVer = getMetaStoreSchemaVersion(conn); + if (fromSchemaVer == null || fromSchemaVer.isEmpty()) { + throw new HiveMetaException("Schema version not stored in the metastore. " + + "Metastore schema is too old or corrupt. Try specifying the version manually"); + } + } + + if (MetaStoreSchemaInfo.getHiveSchemaVersion().equals(fromSchemaVer)) { + System.out.println("No schema upgrade required from version " + fromSchemaVer); + return; + } + // Find the list of scripts to execute for this upgrade + List upgradeScripts = + metaStoreSchemaInfo.getUpgradeScripts(fromSchemaVer); + + System.out.println("Starting upgrade metastore schema from version " + + fromSchemaVer + " to " + MetaStoreSchemaInfo.getHiveSchemaVersion()); + String scriptDir = metaStoreSchemaInfo.getMetaStoreScriptDir(); + for (String scriptFile : upgradeScripts) { System.out.println("Upgrade script " + scriptFile); - if (!dryRun) { - runPreUpgrade(scriptDir, scriptFile); - runBeeLine(scriptDir, scriptFile); - System.out.println("Completed " + scriptFile); - } + runPreUpgrade(conn, scriptDir, scriptFile); + executeScript(conn, scriptDir, scriptFile); + System.out.println("Completed " + scriptFile); } - } catch (IOException eIO) { + + // Revalidated the new version after upgrade + verifySchemaVersion(conn); + } catch (IOException | SQLException eIO) { throw new HiveMetaException( "Upgrade FAILED! Metastore state would be inconsistent !!", eIO); } - - // Revalidated the new version after upgrade - verifySchemaVersion(); + finally { + try { + if (conn != null) { + if (dryRun) { + conn.rollback(); + } + conn.close(); + } + } catch (SQLException e) { + System.err.println("Failed to close the metastore connection:" + e.getMessage()); + } + } } /** @@ -262,9 +276,6 @@ public void doUpgrade(String fromSchemaVer) throws HiveMetaException { */ public void doInit() throws HiveMetaException { doInit(MetaStoreSchemaInfo.getHiveSchemaVersion()); - - // Revalidated the new version after upgrade - verifySchemaVersion(); } /** @@ -275,21 +286,37 @@ public void doInit() throws HiveMetaException { * @throws MetaException */ public void doInit(String toVersion) throws HiveMetaException { - testConnectionToMetastore(); - System.out.println("Starting metastore schema initialization to " + toVersion); - - String initScriptDir = metaStoreSchemaInfo.getMetaStoreScriptDir(); - String initScriptFile = metaStoreSchemaInfo.generateInitFileName(toVersion); + Connection conn = null; try { + conn = getConnectionToMetastore(true); + + System.out.println("Starting metastore schema initialization to " + toVersion); + + String initScriptDir = metaStoreSchemaInfo.getMetaStoreScriptDir(); + String initScriptFile = metaStoreSchemaInfo.generateInitFileName(toVersion); + System.out.println("Initialization script " + initScriptFile); - if (!dryRun) { - runBeeLine(initScriptDir, initScriptFile); - System.out.println("Initialization script completed"); - } - } catch (IOException e) { + executeScript(conn, initScriptDir, initScriptFile); + System.out.println("Initialization script completed"); + + // Revalidated the new version after upgrade + verifySchemaVersion(conn); + + } catch (IOException | SQLException e) { throw new HiveMetaException("Schema initialization FAILED!" + " Metastore state would be inconsistent !!", e); + } finally { + try { + if (conn != null) { + if (dryRun) { + conn.rollback(); + } + conn.close(); + } + } catch (SQLException e) { + System.err.println("Failed to close the metastore connection:" + e.getMessage()); + } } } @@ -302,7 +329,7 @@ public void doInit(String toVersion) throws HiveMetaException { * @param scriptDir upgrade script directory name * @param scriptFile upgrade script file name */ - private void runPreUpgrade(String scriptDir, String scriptFile) { + private void runPreUpgrade(Connection conn, String scriptDir, String scriptFile) { for (int i = 0;; i++) { String preUpgradeScript = MetaStoreSchemaInfo.getPreUpgradeScriptName(i, scriptFile); @@ -312,7 +339,7 @@ private void runPreUpgrade(String scriptDir, String scriptFile) { } try { - runBeeLine(scriptDir, preUpgradeScript); + executeScript(conn, scriptDir, preUpgradeScript); System.out.println("Completed " + preUpgradeScript); } catch (Exception e) { // Ignore the pre-upgrade script errors @@ -325,69 +352,37 @@ private void runPreUpgrade(String scriptDir, String scriptFile) { } } - /*** - * Run beeline with the given metastore script. Flatten the nested scripts - * into single file. + /** + * Execute the script file against database + * @param scriptDir the script directory + * @param scriptFile the script file + * @throws SQLException */ - private void runBeeLine(String scriptDir, String scriptFile) - throws IOException, HiveMetaException { - NestedScriptParser dbCommandParser = getDbCommandParser(dbType); - // expand the nested script - String sqlCommands = dbCommandParser.buildCommand(scriptDir, scriptFile); - File tmpFile = File.createTempFile("schematool", ".sql"); - tmpFile.deleteOnExit(); - - // write out the buffer into a file. Add beeline commands for autocommit and close - FileWriter fstream = new FileWriter(tmpFile.getPath()); - BufferedWriter out = new BufferedWriter(fstream); - out.write("!autocommit on" + System.getProperty("line.separator")); - out.write(sqlCommands); - out.write("!closeall" + System.getProperty("line.separator")); - out.close(); - runBeeLine(tmpFile.getPath()); - } - - // Generate the beeline args per hive conf and execute the given script - public void runBeeLine(String sqlScriptFile) throws IOException { - List argList = new ArrayList(); - argList.add("-u"); - argList.add(HiveSchemaHelper.getValidConfVar( - ConfVars.METASTORECONNECTURLKEY, hiveConf)); - argList.add("-d"); - argList.add(HiveSchemaHelper.getValidConfVar( - ConfVars.METASTORE_CONNECTION_DRIVER, hiveConf)); - argList.add("-n"); - argList.add(userName); - argList.add("-p"); - argList.add(passWord); - argList.add("-f"); - argList.add(sqlScriptFile); - - if (LOG.isDebugEnabled()) { - LOG.debug("Going to invoke file that contains:"); - FileReader fr = new FileReader(sqlScriptFile); - BufferedReader reader = new BufferedReader(fr); - String line; - while ((line = reader.readLine()) != null) { - LOG.debug("script: " + line); + private void executeScript(Connection conn, String scriptDir, String scriptFile) + throws IOException, HiveMetaException, SQLException { + Statement stmt = null; + try { + conn.setAutoCommit(false); + NestedScriptParser dbCommandParser = getDbCommandParser(dbType); + // expand the nested script + List sqlCommands = dbCommandParser.buildCommands(scriptDir, scriptFile); + stmt = conn.createStatement(); + for (String sqlCommand : sqlCommands) { + stmt.addBatch(sqlCommand); + } + stmt.executeBatch(); + // dryRun mode doesn't commit the change + if (!dryRun) { + conn.commit(); + } + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch(SQLException e) { + System.err.println("Failed to close statement:" + e.getMessage()); + } } - } - - // run the script using Beeline - BeeLine beeLine = new BeeLine(); - if (!verbose) { - beeLine.setOutputStream(new PrintStream(new NullOutputStream())); - beeLine.getOpts().setSilent(true); - } - beeLine.getOpts().setAllowMultiLineCommand(false); - beeLine.getOpts().setIsolation("TRANSACTION_READ_COMMITTED"); - // We can be pretty sure that an entire line can be processed as a single command since - // we always add a line separator at the end while calling dbCommandParser.buildCommand. - beeLine.getOpts().setEntireLineAsCommand(true); - LOG.debug("Going to run command <" + StringUtils.join(argList, " ") + ">"); - int status = beeLine.begin(argList.toArray(new String[0]), null); - if (status != 0) { - throw new IOException("Schema script failed, errorcode " + status); } } @@ -425,7 +420,7 @@ private static void initOptions(Options cmdLineOptions) { Option dbOpts = OptionBuilder.withArgName("databaseOpts") .hasArgs().withDescription("Backend DB specific options") .create("dbOpts"); - Option dryRunOpt = new Option("dryRun", "list SQL scripts (no execute)"); + Option dryRunOpt = new Option("dryRun", "execute the SQL scripts without the final commit"); Option verboseOpt = new Option("verbose", "only print SQL statements"); cmdLineOptions.addOption(help); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java index 9c30ee7..2b5b206 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java @@ -19,7 +19,6 @@ import java.io.BufferedReader; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException;