diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 7b63c52..2045ef4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -31,6 +31,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Map.Entry; import java.util.Set; @@ -53,10 +54,12 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -67,16 +70,20 @@ import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.TextInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,7 +115,7 @@ * back and set it once we actually start running the query. */ protected Set acidFileSinks = new HashSet(); - + // whether any ACID table is involved in a query protected boolean acidInQuery; @@ -756,7 +763,7 @@ protected static void processForeignKeys( String[] qualifiedTabName = getQualifiedTableName((ASTNode) parent.getChild(0)); // The ANTLR grammar looks like : // 1. KW_CONSTRAINT idfr=identifier KW_FOREIGN KW_KEY fkCols=columnParenthesesList - // KW_REFERENCES tabName=tableName parCols=columnParenthesesList + // KW_REFERENCES tabName=tableName parCols=columnParenthesesList // enableSpec=enableSpecification validateSpec=validateSpecification relySpec=relySpecification // -> ^(TOK_FOREIGN_KEY $idfr $fkCols $tabName $parCols $relySpec $enableSpec $validateSpec) // when the user specifies the constraint name (i.e. child.getChildCount() == 11) @@ -1324,7 +1331,7 @@ public QueryProperties getQueryProperties() { public Set getAcidFileSinks() { return acidFileSinks; } - + public boolean hasAcidInQuery() { return acidInQuery; } @@ -1747,4 +1754,25 @@ protected String toMessage(ErrorMsg message, Object detail) { public QueryState getQueryState() { return queryState; } + + /** + * Create a FetchTask for a given thrift ddl schema. + * + * @param schema + * thrift ddl + */ + protected FetchTask createFetchTask(String schema) { + Properties prop = new Properties(); + + prop.setProperty(serdeConstants.SERIALIZATION_FORMAT, "9"); + prop.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, " "); + String[] colTypes = schema.split("#"); + prop.setProperty("columns", colTypes[0]); + prop.setProperty("columns.types", colTypes[1]); + prop.setProperty(serdeConstants.SERIALIZATION_LIB, LazySimpleSerDe.class.getName()); + FetchWork fetch = new FetchWork(ctx.getResFile(), new TableDesc( + TextInputFormat.class,IgnoreKeyTextOutputFormat.class, prop), -1); + fetch.setSerializationNullFormat(" "); + return (FetchTask) TaskFactory.get(fetch, conf); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index c7389a8..3f58130 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnStatsUpdateTask; -import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -59,7 +58,6 @@ import org.apache.hadoop.hive.ql.index.HiveIndex; import org.apache.hadoop.hive.ql.index.HiveIndex.IndexType; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; -import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.lib.Node; @@ -72,7 +70,6 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.PKInfo; import org.apache.hadoop.hive.ql.parse.authorization.AuthorizationParseUtils; import org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactory; import org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactoryImpl; @@ -104,7 +101,6 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -139,7 +135,6 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; @@ -149,7 +144,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.util.StringUtils; import java.io.FileNotFoundException; @@ -169,7 +163,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DATABASELOCATION; @@ -1938,27 +1931,6 @@ static public String getColPath( } - /** - * Create a FetchTask for a given thrift ddl schema. - * - * @param schema - * thrift ddl - */ - private FetchTask createFetchTask(String schema) { - Properties prop = new Properties(); - - prop.setProperty(serdeConstants.SERIALIZATION_FORMAT, "9"); - prop.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, " "); - String[] colTypes = schema.split("#"); - prop.setProperty("columns", colTypes[0]); - prop.setProperty("columns.types", colTypes[1]); - prop.setProperty(serdeConstants.SERIALIZATION_LIB, LazySimpleSerDe.class.getName()); - FetchWork fetch = new FetchWork(ctx.getResFile(), new TableDesc( - TextInputFormat.class,IgnoreKeyTextOutputFormat.class, prop), -1); - fetch.setSerializationNullFormat(" "); - return (FetchTask) TaskFactory.get(fetch, conf); - } - private void validateDatabase(String databaseName) throws SemanticException { try { if (!db.databaseExists(databaseName)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 938355e..3948ecc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -17,20 +17,31 @@ */ package org.apache.hadoop.hive.ql.parse; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_BATCH; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_STATUS; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO; + +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; -import org.apache.hadoop.hive.metastore.messaging.InsertMessage; -import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; @@ -44,20 +55,6 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.io.IOUtils; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.hadoop.hive.ql.parse.HiveParser.*; - public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // Database name or pattern private String dbNameOrPattern; @@ -68,6 +65,7 @@ private Integer batchSize; // Base path for REPL LOAD private String path; + private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); @@ -160,8 +158,8 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } String currentReplId = String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId()); - prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId), - "dump_dir,last_repl_id#string,string"); + prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId), dumpSchema); + setFetchTask(createFetchTask(dumpSchema)); } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e);