diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 01abe9b..8ad9fe1 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -177,7 +177,8 @@ private String getResult(int rowNum, int colNum) throws IOException { e.printStackTrace(); throw new RuntimeException(e); } - return (results.get(rowNum).split("\\001"))[colNum]; + // Split around the 'tab' character + return (results.get(rowNum).split("\\t"))[colNum]; } private void verifyResults(String[] data) throws IOException { diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index c84570b..3e461e2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -28,10 +28,12 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.net.URI; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; @@ -1286,4 +1288,37 @@ private void checkForNotExist(ResultSet res) throws Exception { } assertTrue("Rows returned from describe function", numRows > 0); } -} + + @Test + public void testReplDumpResultSet() throws Exception { + String tid = + TestJdbcWithMiniHS2.class.getCanonicalName().toLowerCase().replace('.', '_') + "_" + + System.currentTimeMillis(); + String testPathName = System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid; + Path testPath = new Path(testPathName); + FileSystem fs = testPath.getFileSystem(new HiveConf()); + Statement stmt = conDefault.createStatement(); + try { + stmt.execute("set hive.repl.rootdir = " + testPathName); + ResultSet rs = stmt.executeQuery("repl dump " + testDbName); + ResultSetMetaData rsMeta = rs.getMetaData(); + assertEquals(2, rsMeta.getColumnCount()); + int numRows = 0; + while (rs.next()) { + numRows++; + URI uri = new URI(rs.getString(1)); + int notificationId = rs.getInt(2); + assertNotNull(uri); + assertEquals(testPath.toUri().getScheme(), uri.getScheme()); + assertEquals(testPath.toUri().getAuthority(), uri.getAuthority()); + // In test setup, we append '/next' to hive.repl.rootdir and use that as the dump location + assertEquals(testPath.toUri().getPath() + "/next", uri.getPath()); + assertNotNull(notificationId); + } + assertEquals(1, numRows); + } finally { + // Clean up + fs.delete(testPath, true); + } + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 7b63c52..7d23c1c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -31,6 +31,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Map.Entry; import java.util.Set; @@ -53,10 +54,12 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -67,16 +70,20 @@ import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.TextInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,7 +115,7 @@ * back and set it once we actually start running the query. */ protected Set acidFileSinks = new HashSet(); - + // whether any ACID table is involved in a query protected boolean acidInQuery; @@ -756,7 +763,7 @@ protected static void processForeignKeys( String[] qualifiedTabName = getQualifiedTableName((ASTNode) parent.getChild(0)); // The ANTLR grammar looks like : // 1. KW_CONSTRAINT idfr=identifier KW_FOREIGN KW_KEY fkCols=columnParenthesesList - // KW_REFERENCES tabName=tableName parCols=columnParenthesesList + // KW_REFERENCES tabName=tableName parCols=columnParenthesesList // enableSpec=enableSpecification validateSpec=validateSpecification relySpec=relySpecification // -> ^(TOK_FOREIGN_KEY $idfr $fkCols $tabName $parCols $relySpec $enableSpec $validateSpec) // when the user specifies the constraint name (i.e. child.getChildCount() == 11) @@ -1324,7 +1331,7 @@ public QueryProperties getQueryProperties() { public Set getAcidFileSinks() { return acidFileSinks; } - + public boolean hasAcidInQuery() { return acidInQuery; } @@ -1747,4 +1754,26 @@ protected String toMessage(ErrorMsg message, Object detail) { public QueryState getQueryState() { return queryState; } + + /** + * Create a FetchTask for a given thrift ddl schema. + * + * @param schema + * thrift ddl + */ + protected FetchTask createFetchTask(String schema) { + Properties prop = new Properties(); + // Sets delimiter to tab (ascii 9) + prop.setProperty(serdeConstants.SERIALIZATION_FORMAT, Integer.toString(Utilities.tabCode)); + prop.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, " "); + String[] colTypes = schema.split("#"); + prop.setProperty("columns", colTypes[0]); + prop.setProperty("columns.types", colTypes[1]); + prop.setProperty(serdeConstants.SERIALIZATION_LIB, LazySimpleSerDe.class.getName()); + FetchWork fetch = + new FetchWork(ctx.getResFile(), new TableDesc(TextInputFormat.class, + IgnoreKeyTextOutputFormat.class, prop), -1); + fetch.setSerializationNullFormat(" "); + return (FetchTask) TaskFactory.get(fetch, conf); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index c7389a8..3f58130 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnStatsUpdateTask; -import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -59,7 +58,6 @@ import org.apache.hadoop.hive.ql.index.HiveIndex; import org.apache.hadoop.hive.ql.index.HiveIndex.IndexType; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; -import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.lib.Node; @@ -72,7 +70,6 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.PKInfo; import org.apache.hadoop.hive.ql.parse.authorization.AuthorizationParseUtils; import org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactory; import org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactoryImpl; @@ -104,7 +101,6 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -139,7 +135,6 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; @@ -149,7 +144,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.util.StringUtils; import java.io.FileNotFoundException; @@ -169,7 +163,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DATABASELOCATION; @@ -1938,27 +1931,6 @@ static public String getColPath( } - /** - * Create a FetchTask for a given thrift ddl schema. - * - * @param schema - * thrift ddl - */ - private FetchTask createFetchTask(String schema) { - Properties prop = new Properties(); - - prop.setProperty(serdeConstants.SERIALIZATION_FORMAT, "9"); - prop.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, " "); - String[] colTypes = schema.split("#"); - prop.setProperty("columns", colTypes[0]); - prop.setProperty("columns.types", colTypes[1]); - prop.setProperty(serdeConstants.SERIALIZATION_LIB, LazySimpleSerDe.class.getName()); - FetchWork fetch = new FetchWork(ctx.getResFile(), new TableDesc( - TextInputFormat.class,IgnoreKeyTextOutputFormat.class, prop), -1); - fetch.setSerializationNullFormat(" "); - return (FetchTask) TaskFactory.get(fetch, conf); - } - private void validateDatabase(String databaseName) throws SemanticException { try { if (!db.databaseExists(databaseName)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index a0d492d..e689990 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -149,6 +149,7 @@ private EximUtil() { */ static URI getValidatedURI(HiveConf conf, String dcPath) throws SemanticException { try { + FileSystem fs = FileSystem.get(conf); boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); URI uri = new Path(dcPath).toUri(); String scheme = uri.getScheme(); @@ -158,36 +159,29 @@ static URI getValidatedURI(HiveConf conf, String dcPath) throws SemanticExceptio // generate absolute path relative to home directory if (!path.startsWith("/")) { if (testMode) { - path = (new Path(System.getProperty("test.tmp.dir"), - path)).toUri().getPath(); + path = (new Path(System.getProperty("test.tmp.dir"), path)).toUri().getPath(); } else { - path = (new Path(new Path("/user/" + System.getProperty("user.name")), - path)).toUri().getPath(); - } - } - // set correct scheme and authority - if (StringUtils.isEmpty(scheme)) { - if (testMode) { - scheme = "pfile"; - } else { - scheme = "hdfs"; + path = + (new Path(new Path("/user/" + System.getProperty("user.name")), path)).toUri() + .getPath(); } } + // Set the scheme to the one we got from FileSystem + scheme = fs.getScheme(); - // if scheme is specified but not authority then use the default - // authority + // if scheme is specified but not authority then use the default authority if (StringUtils.isEmpty(authority)) { URI defaultURI = FileSystem.get(conf).getUri(); authority = defaultURI.getAuthority(); } LOG.info("Scheme:" + scheme + ", authority:" + authority + ", path:" + path); - Collection eximSchemes = conf.getStringCollection( - HiveConf.ConfVars.HIVE_EXIM_URI_SCHEME_WL.varname); + Collection eximSchemes = + conf.getStringCollection(HiveConf.ConfVars.HIVE_EXIM_URI_SCHEME_WL.varname); if (!eximSchemes.contains(scheme)) { throw new SemanticException( - ErrorMsg.INVALID_PATH.getMsg( - "only the following file systems accepted for export/import : " + ErrorMsg.INVALID_PATH + .getMsg("only the following file systems accepted for export/import : " + conf.get(HiveConf.ConfVars.HIVE_EXIM_URI_SCHEME_WL.varname))); } @@ -210,29 +204,32 @@ static void validateTable(org.apache.hadoop.hive.ql.metadata.Table table) throws } } - public static String relativeToAbsolutePath(HiveConf conf, String location) throws SemanticException { - boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); - if (testMode) { - URI uri = new Path(location).toUri(); - String scheme = uri.getScheme(); - String authority = uri.getAuthority(); - String path = uri.getPath(); - if (!path.startsWith("/")) { - path = (new Path(System.getProperty("test.tmp.dir"), - path)).toUri().getPath(); - } - if (StringUtils.isEmpty(scheme)) { - scheme = "pfile"; - } - try { - uri = new URI(scheme, authority, path, null, null); - } catch (URISyntaxException e) { - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + public static String relativeToAbsolutePath(HiveConf conf, String location) + throws SemanticException { + try { + FileSystem fs = FileSystem.get(conf); + boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); + if (testMode) { + URI uri = new Path(location).toUri(); + // Get scheme from the FileSystem + String scheme = fs.getScheme(); + String authority = uri.getAuthority(); + String path = uri.getPath(); + if (!path.startsWith("/")) { + path = (new Path(System.getProperty("test.tmp.dir"), path)).toUri().getPath(); + } + try { + uri = new URI(scheme, authority, path, null, null); + } catch (URISyntaxException e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + return uri.toString(); + } else { + // no-op for non-test mode for now + return location; } - return uri.toString(); - } else { - //no-op for non-test mode for now - return location; + } catch (IOException e) { + throw new SemanticException(ErrorMsg.IO_ERROR.getMsg(), e); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 938355e..9e10d4a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -17,20 +17,31 @@ */ package org.apache.hadoop.hive.ql.parse; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_BATCH; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_STATUS; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO; + +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; -import org.apache.hadoop.hive.metastore.messaging.InsertMessage; -import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; @@ -44,20 +55,6 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.io.IOUtils; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.hadoop.hive.ql.parse.HiveParser.*; - public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // Database name or pattern private String dbNameOrPattern; @@ -68,6 +65,7 @@ private Integer batchSize; // Base path for REPL LOAD private String path; + private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); @@ -160,8 +158,8 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } String currentReplId = String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId()); - prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId), - "dump_dir,last_repl_id#string,string"); + prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId), dumpSchema); + setFetchTask(createFetchTask(dumpSchema)); } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e); @@ -478,14 +476,7 @@ private void prepareReturnValues(List values, String schema) throws Sema for (String s : values) { LOG.debug(" > " + s); } - ctx.setResFile(ctx.getLocalTmpPath()); - // FIXME : this should not accessible by the user if we write to it from the frontend. - // Thus, we should Desc/Work this, otherwise there is a security issue here. - // Note: if we don't call ctx.setResFile, we get a NPE from the following code section - // If we do call it, then FetchWork thinks that the "table" here winds up thinking that - // this is a partitioned dir, which does not work. Thus, this does not work. - writeOutput(values); } @@ -496,18 +487,16 @@ private void writeOutput(List values) throws SemanticException { try { fs = outputFile.getFileSystem(conf); outStream = fs.create(outputFile); - outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); + outStream.writeBytes(((values.get(0) == null) ? Utilities.nullStringOutput : values.get(0))); for (int i = 1; i < values.size(); i++) { - outStream.write(Utilities.ctrlaCode); - outStream.writeBytes((values.get(1) == null ? Utilities.nullStringOutput : values.get(1))); + outStream.write(Utilities.tabCode); + outStream.writeBytes(values.get(i)); } outStream.write(Utilities.newLineCode); } catch (IOException e) { - throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error - // codes + throw new SemanticException(e); } finally { - IOUtils.closeStream(outStream); // TODO : we have other closes here, and in ReplCopyTask - - // replace with this + IOUtils.closeStream(outStream); } }