diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b80f7b5..3a65c80 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -211,6 +211,7 @@ private static URL checkConfigFile(File f) { */ public static final HiveConf.ConfVars[] metaVars = { HiveConf.ConfVars.METASTOREWAREHOUSE, + HiveConf.ConfVars.REPLDIR, HiveConf.ConfVars.METASTOREURIS, HiveConf.ConfVars.METASTORE_SERVER_PORT, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, @@ -432,6 +433,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "HDFS root scratch dir for Hive jobs which gets created with write all (733) permission. " + "For each connecting user, an HDFS scratch dir: ${hive.exec.scratchdir}/ is created, " + "with ${hive.scratch.dir.permission}."), + REPLDIR("hive.repl.rootdir","/user/hive/repl/", + "HDFS root dir for all replication dumps."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), @@ -2062,7 +2065,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "When true the HDFS location stored in the index file will be ignored at runtime.\n" + "If the data got moved or the name of the cluster got changed, the index data should still be usable."), - HIVE_EXIM_URI_SCHEME_WL("hive.exim.uri.scheme.whitelist", "hdfs,pfile", + HIVE_EXIM_URI_SCHEME_WL("hive.exim.uri.scheme.whitelist", "hdfs,pfile,file", "A comma separated list of acceptable URI schemes for import and export."), // temporary variable for testing. This is added just to turn off this feature in case of a bug in // deployment. It has not been documented in hive-default.xml intentionally, this should be removed diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java new file mode 100644 index 0000000..4c0f817 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.plan.CopyWork; +import org.apache.hadoop.hive.ql.plan.ReplCopyWork; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.util.StringUtils; + +public class ReplCopyTask extends Task implements Serializable { + + + private static final long serialVersionUID = 1L; + + private static transient final Logger LOG = LoggerFactory.getLogger(ReplCopyTask.class); + + public ReplCopyTask(){ + super(); + } + + @Override + protected int execute(DriverContext driverContext) { + LOG.debug("ReplCopyTask.execute()"); + FileSystem dstFs = null; + Path toPath = null; + try { + Path fromPath = work.getFromPath(); + toPath = work.getToPath(); + + console.printInfo("Copying data from " + fromPath.toString(), " to " + + toPath.toString()); + + ReplCopyWork rwork = ((ReplCopyWork)work); + + FileSystem srcFs = fromPath.getFileSystem(conf); + dstFs = toPath.getFileSystem(conf); + + List srcFiles = new ArrayList(); + FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath); + LOG.debug("ReplCopyTasks srcs=" + (srcs == null ? "null" : srcs.length)); + if (! rwork.getReadListFromInput()){ + if (srcs == null || srcs.length == 0) { + if (work.isErrorOnSrcEmpty()) { + console.printError("No files matching path: " + fromPath.toString()); + return 3; + } else { + return 0; + } + } + } else { + LOG.debug("ReplCopyTask making sense of _files"); + // Our input is probably the result of a _files listing, we should expand out _files. + srcFiles = filesInFileListing(srcFs,fromPath); + LOG.debug("ReplCopyTask _files contains:" + (srcFiles == null ? "null" : srcFiles.size())); + if (srcFiles == null){ + if (work.isErrorOnSrcEmpty()) { + console.printError("No _files entry found on source: " + fromPath.toString()); + return 5; + } else { + return 0; + } + } + } + // Add in all the lone filecopies expected as well - applies to + // both _files case stragglers and regular copies + srcFiles.addAll(Arrays.asList(srcs)); + LOG.debug("ReplCopyTask numFiles:" + (srcFiles == null ? "null" : srcFiles.size())); + + boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); + if (!FileUtils.mkdir(dstFs, toPath, inheritPerms, conf)) { + console.printError("Cannot make target directory: " + toPath.toString()); + return 2; + } + + BufferedWriter listBW = null; + if (rwork.getListFilesOnOutputBehaviour()){ + Path listPath = new Path(toPath,"_files"); + LOG.debug("ReplCopyTask : generating _files at :" + listPath.toUri().toString()); + if (dstFs.exists(listPath)){ + console.printError("Cannot make target _files file:" + listPath.toString()); + return 4; + } + listBW = new BufferedWriter(new OutputStreamWriter(dstFs.create(listPath))); + // TODO : verify that not specifying charset here does not bite us + // later(for cases where filenames have unicode chars) + } + + for (FileStatus oneSrc : srcFiles) { + console.printInfo("Copying file: " + oneSrc.getPath().toString()); + LOG.debug("Copying file: " + oneSrc.getPath().toString()); + if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){ + FileSystem actualSrcFs = null; + if (rwork.getReadListFromInput()){ + // TODO : filesystemcache prevents this from being a perf nightmare, but we + // should still probably follow up to see if we need to do something better here. + actualSrcFs = oneSrc.getPath().getFileSystem(conf); + } else { + actualSrcFs = srcFs; + } + + LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath); + if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath, + false, // delete source + true, // overwrite destination + conf)) { + console.printError("Failed to copy: '" + oneSrc.getPath().toString() + + "to: '" + toPath.toString() + "'"); + return 1; + } + }else{ + LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri()); + console.printInfo("Tracking file: " + oneSrc.getPath().toUri()); + listBW.write(oneSrc.getPath().toUri().toString() + "\n"); + } + } + + if (listBW != null){ + listBW.close(); + } + + return 0; + + } catch (Exception e) { + console.printError("Failed with exception " + e.getMessage(), "\n" + + StringUtils.stringifyException(e)); + return (1); + } + } + + + private List filesInFileListing(FileSystem fs, Path path) + throws IOException { + Path fileListing = new Path(path, "_files"); + LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri()); + if (! fs.exists(fileListing)){ + LOG.debug("ReplCopyTask : _files does not exist"); + return null; // Returning null from this fn can serve as an err condition. + // On success, but with nothing to return, we can return an empty list. + } + + List ret = new ArrayList(); + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(fileListing))); + // TODO : verify if skipping charset here is okay + + String line = null; + while ( (line = br.readLine()) != null){ + LOG.debug("ReplCopyTask :_filesReadLine:" + line); + Path p = new Path(line); + FileSystem srcFs = p.getFileSystem(conf); // TODO : again, fs cache should make this okay, but if not, revisit + ret.add(srcFs.getFileStatus(p)); + // Note - we need srcFs rather than fs, because it is possible that the _files lists files + // which are from a different filesystem than the fs where the _files file itself was loaded + // from. Currently, it is possible, for eg., to do REPL LOAD hdfs:///dir/ and for the _files + // in it to contain hdfs:/// entries, and/or vice-versa, and this causes errors. + // It might also be possible that there will be a mix of them in a given _files file. + // TODO: revisit close to the end of replv2 dev, to see if our assumption now still holds, + // and if not so, optimize. + } + + return ret; + } + + @Override + public StageType getType() { + return StageType.COPY; + // there's no extensive need for this to have its own type - it mirrors + // the intent of copy enough. This might change later, though. + } + + @Override + public String getName() { + return "REPL_COPY"; + } + + public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) { + Task copyTask = null; + LOG.debug("ReplCopyTask:getLoadCopyTask: "+srcPath + "=>" + dstPath); + if (replicationSpec.isInReplicationScope()){ + ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); + LOG.debug("ReplCopyTask:\trcwork"); + if (replicationSpec.isLazy()){ + LOG.debug("ReplCopyTask:\tlazy"); + rcwork.setReadListFromInput(true); + } + copyTask = TaskFactory.get(rcwork, conf); + } else { + LOG.debug("ReplCopyTask:\tcwork"); + copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf); + } + return copyTask; + } + + public static Task getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) { + Task copyTask = null; + LOG.debug("ReplCopyTask:getDumpCopyTask: "+srcPath + "=>" + dstPath); + if (replicationSpec.isInReplicationScope()){ + ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); + LOG.debug("ReplCopyTask:\trcwork"); + if (replicationSpec.isLazy()){ + LOG.debug("ReplCopyTask:\tlazy"); + rcwork.setListFilesOnOutputBehaviour(true); + } + copyTask = TaskFactory.get(rcwork, conf); + } else { + LOG.debug("ReplCopyTask:\tcwork"); + copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf); + } + return copyTask; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 14fd61a..d61a460 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.ReplCopyWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.StatsNoJobWork; import org.apache.hadoop.hive.ql.plan.StatsWork; @@ -77,6 +78,7 @@ public TaskTuple(Class workClass, Class> taskClass) { taskvec.add(new TaskTuple(MoveWork.class, MoveTask.class)); taskvec.add(new TaskTuple(FetchWork.class, FetchTask.class)); taskvec.add(new TaskTuple(CopyWork.class, CopyTask.class)); + taskvec.add(new TaskTuple(ReplCopyWork.class, ReplCopyTask.class)); taskvec.add(new TaskTuple(DDLWork.class, DDLTask.class)); taskvec.add(new TaskTuple(FunctionWork.class, FunctionTask.class)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index ffb6ae3..7b63c52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -1607,8 +1607,12 @@ protected WriteEntity toWriteEntity(String location) throws SemanticException { } protected WriteEntity toWriteEntity(Path location) throws SemanticException { + return toWriteEntity(location,conf); + } + + public static WriteEntity toWriteEntity(Path location, HiveConf conf) throws SemanticException { try { - Path path = tryQualifyPath(location); + Path path = tryQualifyPath(location,conf); return new WriteEntity(path, FileUtils.isLocalFile(conf, path.toUri())); } catch (Exception e) { throw new SemanticException(e); @@ -1620,8 +1624,12 @@ protected ReadEntity toReadEntity(String location) throws SemanticException { } protected ReadEntity toReadEntity(Path location) throws SemanticException { + return toReadEntity(location, conf); + } + + public static ReadEntity toReadEntity(Path location, HiveConf conf) throws SemanticException { try { - Path path = tryQualifyPath(location); + Path path = tryQualifyPath(location, conf); return new ReadEntity(path, FileUtils.isLocalFile(conf, path.toUri())); } catch (Exception e) { throw new SemanticException(e); @@ -1629,6 +1637,10 @@ protected ReadEntity toReadEntity(Path location) throws SemanticException { } private Path tryQualifyPath(Path path) throws IOException { + return tryQualifyPath(path,conf); + } + + public static Path tryQualifyPath(Path path, HiveConf conf) throws IOException { try { return path.getFileSystem(conf).makeQualified(path); } catch (IOException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 167f7a5..a0d492d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -20,6 +20,14 @@ import com.google.common.base.Function; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FSDataInputStream; @@ -45,10 +53,12 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -63,8 +73,73 @@ */ public class EximUtil { + public static final String METADATA_NAME="_metadata"; + private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class); + /** + * Wrapper class for common BaseSemanticAnalyzer non-static members + * into static generic methods without having the fn signatures + * becoming overwhelming, with passing each of these into every function. + * + * Note, however, that since this is constructed with args passed in, + * parts of the context, such as the tasks or inputs, might have been + * overridden with temporary context values, rather than being exactly + * 1:1 equivalent to BaseSemanticAnalyzer.getRootTasks() or BSA.getInputs(). + */ + public static class SemanticAnalyzerWrapperContext { + private HiveConf conf; + private Hive db; + private HashSet inputs; + private HashSet outputs; + private List> tasks; + private Logger LOG; + private Context ctx; + + public HiveConf getConf() { + return conf; + } + + public Hive getHive() { + return db; + } + + public HashSet getInputs() { + return inputs; + } + + public HashSet getOutputs() { + return outputs; + } + + public List> getTasks() { + return tasks; + } + + public Logger getLOG() { + return LOG; + } + + public Context getCtx() { + return ctx; + } + + public SemanticAnalyzerWrapperContext(HiveConf conf, Hive db, + HashSet inputs, + HashSet outputs, + List> tasks, + Logger LOG, Context ctx){ + this.conf = conf; + this.db = db; + this.inputs = inputs; + this.outputs = outputs; + this.tasks = tasks; + this.LOG = LOG; + this.ctx = ctx; + } + } + + private EximUtil() { } @@ -162,10 +237,41 @@ public static String relativeToAbsolutePath(HiveConf conf, String location) thro } /* major version number should match for backward compatibility */ - public static final String METADATA_FORMAT_VERSION = "0.1"; + public static final String METADATA_FORMAT_VERSION = "0.2"; + /* If null, then the major version number should match */ public static final String METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION = null; + public static void createDbExportDump( + FileSystem fs, Path metadataPath, Database dbObj, + ReplicationSpec replicationSpec) throws IOException, SemanticException { + + // WARNING NOTE : at this point, createDbExportDump lives only in a world where ReplicationSpec is in replication scope + // If we later make this work for non-repl cases, analysis of this logic might become necessary. Also, this is using + // Replv2 semantics, i.e. with listFiles laziness (no copy at export time) + + OutputStream out = fs.create(metadataPath); + JsonGenerator jgen = (new JsonFactory()).createJsonGenerator(out); + jgen.writeStartObject(); + jgen.writeStringField("version",METADATA_FORMAT_VERSION); + dbObj.putToParameters(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replicationSpec.getCurrentReplicationState()); + + if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) { + jgen.writeStringField("fcversion",METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION); + } + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + try { + jgen.writeStringField("db", serializer.toString(dbObj, "UTF-8")); + } catch (TException e) { + throw new SemanticException( + ErrorMsg.ERROR_SERIALIZE_METASTORE + .getMsg(), e); + } + + jgen.writeEndObject(); + jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when we call close. + } + public static void createExportDump(FileSystem fs, Path metadataPath, org.apache.hadoop.hive.ql.metadata.Table tableHandle, Iterable partitions, @@ -255,19 +361,25 @@ private static void write(OutputStream out, String s) throws IOException { * Utility class to help return complex value from readMetaData function */ public static class ReadMetaData { + private final Database db; private final Table table; private final Iterable partitions; private final ReplicationSpec replicationSpec; public ReadMetaData(){ - this(null,null,new ReplicationSpec()); + this(null,null,null,new ReplicationSpec()); } - public ReadMetaData(Table table, Iterable partitions, ReplicationSpec replicationSpec){ + public ReadMetaData(Database db, Table table, Iterable partitions, ReplicationSpec replicationSpec){ + this.db = db; this.table = table; this.partitions = partitions; this.replicationSpec = replicationSpec; } + public Database getDatabase(){ + return db; + } + public Table getTable() { return table; } @@ -298,12 +410,21 @@ public static ReadMetaData readMetaData(FileSystem fs, Path metadataPath) String version = jsonContainer.getString("version"); String fcversion = getJSONStringEntry(jsonContainer, "fcversion"); checkCompatibility(version, fcversion); + + String dbDesc = getJSONStringEntry(jsonContainer, "db"); String tableDesc = getJSONStringEntry(jsonContainer,"table"); + TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory()); + + Database db = null; + if (dbDesc != null){ + db = new Database(); + deserializer.deserialize(db, dbDesc, "UTF-8"); + } + Table table = null; List partitionsList = null; if (tableDesc != null){ table = new Table(); - TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory()); deserializer.deserialize(table, tableDesc, "UTF-8"); // TODO : jackson-streaming-iterable-redo this JSONArray jsonPartitions = new JSONArray(jsonContainer.getString("partitions")); @@ -316,7 +437,7 @@ public static ReadMetaData readMetaData(FileSystem fs, Path metadataPath) } } - return new ReadMetaData(table, partitionsList,readReplicationSpec(jsonContainer)); + return new ReadMetaData(db, table, partitionsList,readReplicationSpec(jsonContainer)); } catch (JSONException e) { throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e); } catch (TException e) { @@ -438,4 +559,18 @@ public static boolean schemaCompare(List newSchema, List> rootTasks, HashSet inputs, HashSet outputs, + Logger LOG) throws SemanticException { + if (ts != null) { try { EximUtil.validateTable(ts.tableHandle); @@ -109,6 +126,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } try { + FileSystem fs = FileSystem.get(toURI, conf); Path toPath = new Path(toURI.getScheme(), toURI.getAuthority(), toURI.getPath()); try { @@ -156,12 +174,12 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { EximUtil.createExportDump( FileSystem.getLocal(conf), path, - (ts != null ? ts.tableHandle: null), + (ts != null ? ts.tableHandle : null), partitions, replicationSpec); - Task rTask = TaskFactory.get(new CopyWork( - path, new Path(toURI), false), conf); + Task rTask = ReplCopyTask.getDumpCopyTask(replicationSpec, path, new Path(toURI), conf); + rootTasks.add(rTask); LOG.debug("_metadata file written into " + path.toString() + " and then copied to " + toURI.toString()); @@ -177,23 +195,22 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { for (Partition partition : partitions) { Path fromPath = partition.getDataLocation(); Path toPartPath = new Path(parentPath, partition.getName()); - Task rTask = TaskFactory.get( - new CopyWork(fromPath, toPartPath, false), - conf); + Task rTask = + ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toPartPath, conf); rootTasks.add(rTask); inputs.add(new ReadEntity(partition)); } } else { Path fromPath = ts.tableHandle.getDataLocation(); Path toDataPath = new Path(parentPath, "data"); - Task rTask = TaskFactory.get(new CopyWork( - fromPath, toDataPath, false), conf); + Task rTask = + ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf); rootTasks.add(rTask); inputs.add(new ReadEntity(ts.tableHandle)); } - outputs.add(toWriteEntity(parentPath)); + outputs.add(toWriteEntity(parentPath,conf)); } - } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index 63c32a8..bf85ade 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -336,6 +336,10 @@ KW_KEY: 'KEY'; KW_ABORT: 'ABORT'; KW_EXTRACT: 'EXTRACT'; KW_FLOOR: 'FLOOR'; +KW_REPL: 'REPL'; +KW_DUMP: 'DUMP'; +KW_BATCH: 'BATCH'; +KW_STATUS: 'STATUS'; // Operators // NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 8aa39b0..7974d39 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -384,6 +384,11 @@ TOK_ROLLBACK; TOK_SET_AUTOCOMMIT; TOK_CACHE_METADATA; TOK_ABORT_TRANSACTIONS; +TOK_REPL_DUMP; +TOK_REPL_LOAD; +TOK_REPL_STATUS; +TOK_BATCH; +TOK_TO; } @@ -733,6 +738,9 @@ execStatement | loadStatement | exportStatement | importStatement + | replDumpStatement + | replLoadStatement + | replStatusStatement | ddlStatement | deleteStatement | updateStatement @@ -773,6 +781,35 @@ importStatement -> ^(TOK_IMPORT $path $tab? $ext? tableLocation?) ; +replDumpStatement +@init { pushMsg("replication dump statement", state); } +@after { popMsg(state); } + : KW_REPL KW_DUMP + (dbName=identifier) (DOT tblName=identifier)? + (KW_FROM (eventId=Number) + (KW_TO (rangeEnd=Number))? + (KW_BATCH (batchSize=Number))? + )? + -> ^(TOK_REPL_DUMP $dbName $tblName? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_BATCH $batchSize)?)? ) + ; + +replLoadStatement +@init { pushMsg("replication load statement", state); } +@after { popMsg(state); } + : KW_REPL KW_LOAD + ((dbName=identifier) (DOT tblName=identifier)?)? + KW_FROM (path=StringLiteral) + -> ^(TOK_REPL_LOAD $path $dbName? $tblName?) + ; + +replStatusStatement +@init { pushMsg("replication load statement", state); } +@after { popMsg(state); } + : KW_REPL KW_STATUS + (dbName=identifier) (DOT tblName=identifier)? + -> ^(TOK_REPL_STATUS $dbName $tblName?) + ; + ddlStatement @init { pushMsg("ddl statement", state); } @after { popMsg(state); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index 2e40aa5..4af0c62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -759,6 +759,7 @@ nonReserved | KW_VALIDATE | KW_NOVALIDATE | KW_KEY + | KW_REPL | KW_DUMP | KW_BATCH | KW_STATUS ; //The following SQL2011 reserved keywords are used as function name only, but not as identifiers. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 9986fcf..3420efd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; @@ -48,16 +49,17 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; -import org.apache.hadoop.hive.ql.plan.CopyWork; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DropTableDesc; @@ -73,12 +75,14 @@ */ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { - public static final String METADATA_NAME="_metadata"; - public ImportSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); } + // FIXME : Note that the tableExists flag as used by Auth is kinda a hack and + // assumes only 1 table will ever be imported - this assumption is broken by + // REPL LOAD. We need to fix this. Maybe by continuing the hack and replacing + // by a map, maybe by coming up with a better api for it. private boolean tableExists = false; public boolean existsTable() { @@ -92,14 +96,16 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { boolean isLocationSet = false; boolean isExternalSet = false; - boolean isTableSet = false; - boolean isDbNameSet = false; boolean isPartSpecSet = false; String parsedLocation = null; String parsedTableName = null; String parsedDbName = null; LinkedHashMap parsedPartSpec = new LinkedHashMap(); + // waitOnCreateDb determines whether or not non-existence of + // db is an error. For regular imports, it is. + boolean waitOnCreateDb = false; + for (int i = 1; i < ast.getChildCount(); ++i){ ASTNode child = (ASTNode) ast.getChild(i); switch (child.getToken().getType()){ @@ -111,14 +117,10 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { parsedLocation = EximUtil.relativeToAbsolutePath(conf, unescapeSQLString(child.getChild(0).getText())); break; case HiveParser.TOK_TAB: - isTableSet = true; ASTNode tableNameNode = (ASTNode) child.getChild(0); Map.Entry dbTablePair = getDbTableNamePair(tableNameNode); parsedDbName = dbTablePair.getKey(); parsedTableName = dbTablePair.getValue(); - if (parsedDbName != null){ - isDbNameSet = true; - } // get partition metadata if partition specified if (child.getChildCount() == 2) { ASTNode partspec = (ASTNode) child.getChild(1); @@ -130,111 +132,11 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } // parsing statement is now done, on to logic. + tableExists = prepareImport( + isLocationSet, isExternalSet, isPartSpecSet, waitOnCreateDb, + parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(), + new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx)); - // initialize load path - URI fromURI = EximUtil.getValidatedURI(conf, stripQuotes(fromTree.getText())); - FileSystem fs = FileSystem.get(fromURI, conf); - Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); - inputs.add(toReadEntity(fromPath)); - - EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData(); - try { - rv = EximUtil.readMetaData(fs, new Path(fromPath, METADATA_NAME)); - } catch (IOException e) { - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); - } - - ReplicationSpec replicationSpec = rv.getReplicationSpec(); - if (replicationSpec.isNoop()){ - // nothing to do here, silently return. - return; - } - - String dbname = SessionState.get().getCurrentDatabase(); - if (isDbNameSet){ - // If the parsed statement contained a db.tablename specification, prefer that. - dbname = parsedDbName; - } - - // Create table associated with the import - // Executed if relevant, and used to contain all the other details about the table if not. - CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable()); - - if (isExternalSet){ - tblDesc.setExternal(isExternalSet); - // This condition-check could have been avoided, but to honour the old - // default of not calling if it wasn't set, we retain that behaviour. - // TODO:cleanup after verification that the outer if isn't really needed here - } - - if (isLocationSet){ - tblDesc.setLocation(parsedLocation); - inputs.add(toReadEntity(parsedLocation)); - } - - if (isTableSet){ - tblDesc.setTableName(parsedTableName); - } - - List partitionDescs = new ArrayList(); - Iterable partitions = rv.getPartitions(); - for (Partition partition : partitions) { - // TODO: this should ideally not create AddPartitionDesc per partition - AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition); - partitionDescs.add(partsDesc); - } - - if (isPartSpecSet){ - // The import specification asked for only a particular partition to be loaded - // We load only that, and ignore all the others. - boolean found = false; - for (Iterator partnIter = partitionDescs - .listIterator(); partnIter.hasNext();) { - AddPartitionDesc addPartitionDesc = partnIter.next(); - if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) { - found = true; - } else { - partnIter.remove(); - } - } - if (!found) { - throw new SemanticException( - ErrorMsg.INVALID_PARTITION - .getMsg(" - Specified partition not found in import directory")); - } - } - - if (tblDesc.getTableName() == null) { - // Either we got the tablename from the IMPORT statement (first priority) - // or from the export dump. - throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg()); - } else { - conf.set("import.destination.table", tblDesc.getTableName()); - for (AddPartitionDesc addPartitionDesc : partitionDescs) { - addPartitionDesc.setTableName(tblDesc.getTableName()); - } - } - - Warehouse wh = new Warehouse(conf); - Table table = tableIfExists(tblDesc); - - if (table != null){ - checkTable(table, tblDesc,replicationSpec); - LOG.debug("table " + tblDesc.getTableName() + " exists: metadata checked"); - tableExists = true; - } - - if (!replicationSpec.isInReplicationScope()){ - createRegularImportTasks( - rootTasks, tblDesc, partitionDescs, - isPartSpecSet, replicationSpec, table, - fromURI, fs, wh); - } else { - createReplImportTasks( - rootTasks, tblDesc, partitionDescs, - isPartSpecSet, replicationSpec, table, - fromURI, fs, wh); - } } catch (SemanticException e) { throw e; } catch (Exception e) { @@ -265,7 +167,123 @@ private void parsePartitionSpec(ASTNode tableNode, LinkedHashMap } } - private AddPartitionDesc getBaseAddPartitionDescFromPartition( + public static boolean prepareImport( + boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnCreateDb, + String parsedLocation, String parsedTableName, String parsedDbName, + LinkedHashMap parsedPartSpec, + String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x + ) throws IOException, MetaException, HiveException, URISyntaxException { + + // initialize load path + URI fromURI = EximUtil.getValidatedURI(x.getConf(), stripQuotes(fromLocn)); + Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); + + FileSystem fs = FileSystem.get(fromURI, x.getConf()); + x.getInputs().add(toReadEntity(fromPath, x.getConf())); + + EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData(); + try { + rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); + } catch (IOException e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + + ReplicationSpec replicationSpec = rv.getReplicationSpec(); + if (replicationSpec.isNoop()){ + // nothing to do here, silently return. + return false; + } + + String dbname = SessionState.get().getCurrentDatabase(); + if ((parsedDbName !=null) && (!parsedDbName.isEmpty())){ + // If the parsed statement contained a db.tablename specification, prefer that. + dbname = parsedDbName; + } + + // Create table associated with the import + // Executed if relevant, and used to contain all the other details about the table if not. + CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable()); + + if (isExternalSet){ + tblDesc.setExternal(isExternalSet); + // This condition-check could have been avoided, but to honour the old + // default of not calling if it wasn't set, we retain that behaviour. + // TODO:cleanup after verification that the outer if isn't really needed here + } + + if (isLocationSet){ + tblDesc.setLocation(parsedLocation); + x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf())); + } + + if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){ + tblDesc.setTableName(parsedTableName); + } + + List partitionDescs = new ArrayList(); + Iterable partitions = rv.getPartitions(); + for (Partition partition : partitions) { + // TODO: this should ideally not create AddPartitionDesc per partition + AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition); + partitionDescs.add(partsDesc); + } + + if (isPartSpecSet){ + // The import specification asked for only a particular partition to be loaded + // We load only that, and ignore all the others. + boolean found = false; + for (Iterator partnIter = partitionDescs + .listIterator(); partnIter.hasNext();) { + AddPartitionDesc addPartitionDesc = partnIter.next(); + if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) { + found = true; + } else { + partnIter.remove(); + } + } + if (!found) { + throw new SemanticException( + ErrorMsg.INVALID_PARTITION + .getMsg(" - Specified partition not found in import directory")); + } + } + + if (tblDesc.getTableName() == null) { + // Either we got the tablename from the IMPORT statement (first priority) + // or from the export dump. + throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg()); + } else { + x.getConf().set("import.destination.table", tblDesc.getTableName()); + for (AddPartitionDesc addPartitionDesc : partitionDescs) { + addPartitionDesc.setTableName(tblDesc.getTableName()); + } + } + + Warehouse wh = new Warehouse(x.getConf()); + Table table = tableIfExists(tblDesc, x.getHive()); + boolean tableExists = false; + + if (table != null){ + checkTable(table, tblDesc,replicationSpec, x.getConf()); + x.getLOG().debug("table " + tblDesc.getTableName() + " exists: metadata checked"); + tableExists = true; + } + + if (!replicationSpec.isInReplicationScope()){ + createRegularImportTasks( + tblDesc, partitionDescs, + isPartSpecSet, replicationSpec, table, + fromURI, fs, wh, x); + } else { + createReplImportTasks( + tblDesc, partitionDescs, + isPartSpecSet, replicationSpec, waitOnCreateDb, table, + fromURI, fs, wh, x); + } + return tableExists; + } + + private static AddPartitionDesc getBaseAddPartitionDescFromPartition( Path fromPath, String dbname, CreateTableDesc tblDesc, Partition partition) throws MetaException { AddPartitionDesc partsDesc = new AddPartitionDesc(dbname, tblDesc.getTableName(), EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()), @@ -284,7 +302,7 @@ private AddPartitionDesc getBaseAddPartitionDescFromPartition( return partsDesc; } - private CreateTableDesc getBaseCreateTableDescFromTable(String dbName, + private static CreateTableDesc getBaseCreateTableDescFromTable(String dbName, org.apache.hadoop.hive.metastore.api.Table table) { if ((table.getPartitionKeys() == null) || (table.getPartitionKeys().size() == 0)){ table.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); @@ -318,94 +336,95 @@ private CreateTableDesc getBaseCreateTableDescFromTable(String dbName, return tblDesc; } - private Task loadTable(URI fromURI, Table table, boolean replace, Path tgtPath) { + private static Task loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, + ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) { Path dataPath = new Path(fromURI.toString(), "data"); - Path tmpPath = ctx.getExternalTmpPath(tgtPath); - Task copyTask = TaskFactory.get(new CopyWork(dataPath, - tmpPath, false), conf); + Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath); + Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf()); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, Utilities.getTableDesc(table), new TreeMap(), replace); - Task loadTableTask = TaskFactory.get(new MoveWork(getInputs(), - getOutputs(), loadTableWork, null, false), conf); + Task loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(), + x.getOutputs(), loadTableWork, null, false), x.getConf()); copyTask.addDependentTask(loadTableTask); - rootTasks.add(copyTask); + x.getTasks().add(copyTask); return loadTableTask; } - private Task createTableTask(CreateTableDesc tableDesc){ + private static Task createTableTask(CreateTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){ return TaskFactory.get(new DDLWork( - getInputs(), - getOutputs(), + x.getInputs(), + x.getOutputs(), tableDesc - ), conf); + ), x.getConf()); } - private Task dropTableTask(Table table){ + private static Task dropTableTask(Table table, EximUtil.SemanticAnalyzerWrapperContext x){ return TaskFactory.get(new DDLWork( - getInputs(), - getOutputs(), + x.getInputs(), + x.getOutputs(), new DropTableDesc(table.getTableName(), null, true, true, null) - ), conf); + ), x.getConf()); } - private Task alterTableTask(CreateTableDesc tableDesc) { + private static Task alterTableTask(CreateTableDesc tableDesc, + EximUtil.SemanticAnalyzerWrapperContext x) { tableDesc.setReplaceMode(true); return TaskFactory.get(new DDLWork( - getInputs(), - getOutputs(), + x.getInputs(), + x.getOutputs(), tableDesc - ), conf); + ), x.getConf()); } - private Task alterSinglePartition( + private static Task alterSinglePartition( URI fromURI, FileSystem fs, CreateTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, - ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn) { + ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn, + EximUtil.SemanticAnalyzerWrapperContext x) { addPartitionDesc.setReplaceMode(true); addPartitionDesc.getPartition(0).setLocation(ptn.getLocation()); // use existing location return TaskFactory.get(new DDLWork( - getInputs(), - getOutputs(), + x.getInputs(), + x.getOutputs(), addPartitionDesc - ), conf); + ), x.getConf()); } - - private Task addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc, + private static Task addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc, Table table, Warehouse wh, - AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec) + AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); if (tblDesc.isExternal() && tblDesc.getLocation() == null) { - LOG.debug("Importing in-place: adding AddPart for partition " + x.getLOG().debug("Importing in-place: adding AddPart for partition " + partSpecToString(partSpec.getPartSpec())); // addPartitionDesc already has the right partition location - Task addPartTask = TaskFactory.get(new DDLWork(getInputs(), - getOutputs(), addPartitionDesc), conf); + Task addPartTask = TaskFactory.get(new DDLWork(x.getInputs(), + x.getOutputs(), addPartitionDesc), x.getConf()); return addPartTask; } else { String srcLocation = partSpec.getLocation(); - fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec); - LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " + fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec, x); + x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition " + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); Path tgtLocation = new Path(partSpec.getLocation()); - Path tmpPath = ctx.getExternalTmpPath(tgtLocation); - Task copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), - tmpPath, false), conf); - Task addPartTask = TaskFactory.get(new DDLWork(getInputs(), - getOutputs(), addPartitionDesc), conf); + Path tmpPath = x.getCtx().getExternalTmpPath(tgtLocation); + Task copyTask = ReplCopyTask.getLoadCopyTask( + replicationSpec, new Path(srcLocation), tmpPath, x.getConf()); + Task addPartTask = TaskFactory.get(new DDLWork(x.getInputs(), + x.getOutputs(), addPartitionDesc), x.getConf()); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), true); loadTableWork.setInheritTableSpecs(false); Task loadPartTask = TaskFactory.get(new MoveWork( - getInputs(), getOutputs(), loadTableWork, null, false), - conf); + x.getInputs(), x.getOutputs(), loadTableWork, null, false), + x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); - rootTasks.add(copyTask); + x.getTasks().add(copyTask); return addPartTask; } } @@ -413,17 +432,18 @@ private CreateTableDesc getBaseCreateTableDescFromTable(String dbName, /** * Helper method to set location properly in partSpec */ - private void fixLocationInPartSpec( + private static void fixLocationInPartSpec( FileSystem fs, CreateTableDesc tblDesc, Table table, Warehouse wh, ReplicationSpec replicationSpec, - AddPartitionDesc.OnePartitionDesc partSpec) throws MetaException, HiveException, IOException { + AddPartitionDesc.OnePartitionDesc partSpec, + EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, HiveException, IOException { Path tgtPath = null; if (tblDesc.getLocation() == null) { if (table.getDataLocation() != null) { tgtPath = new Path(table.getDataLocation().toString(), Warehouse.makePartPath(partSpec.getPartSpec())); } else { - Database parentDb = db.getDatabase(tblDesc.getDatabaseName()); + Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName()); tgtPath = new Path( wh.getTablePath( parentDb, tblDesc.getTableName()), Warehouse.makePartPath(partSpec.getPartSpec())); @@ -432,22 +452,23 @@ private void fixLocationInPartSpec( tgtPath = new Path(tblDesc.getLocation(), Warehouse.makePartPath(partSpec.getPartSpec())); } - FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf); - checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec); + FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); + checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x); partSpec.setLocation(tgtPath.toString()); } - private void checkTargetLocationEmpty(FileSystem fs, Path targetPath, ReplicationSpec replicationSpec) + private static void checkTargetLocationEmpty(FileSystem fs, Path targetPath, ReplicationSpec replicationSpec, + EximUtil.SemanticAnalyzerWrapperContext x) throws IOException, SemanticException { if (replicationSpec.isInReplicationScope()){ // replication scope allows replacement, and does not require empty directories return; } - LOG.debug("checking emptiness of " + targetPath.toString()); + x.getLOG().debug("checking emptiness of " + targetPath.toString()); if (fs.exists(targetPath)) { FileStatus[] status = fs.listStatus(targetPath, FileUtils.HIDDEN_FILES_PATH_FILTER); if (status.length > 0) { - LOG.debug("Files inc. " + status[0].getPath().toString() + x.getLOG().debug("Files inc. " + status[0].getPath().toString() + " found in path : " + targetPath.toString()); throw new SemanticException(ErrorMsg.TABLE_DATA_EXISTS.getMsg()); } @@ -469,7 +490,7 @@ private static String partSpecToString(Map partSpec) { return sb.toString(); } - private void checkTable(Table table, CreateTableDesc tableDesc, ReplicationSpec replicationSpec) + private static void checkTable(Table table, CreateTableDesc tableDesc, ReplicationSpec replicationSpec, HiveConf conf) throws SemanticException, URISyntaxException { // This method gets called only in the scope that a destination table already exists, so // we're validating if the table is an appropriate destination to import into @@ -681,25 +702,33 @@ private static String checkParams(Map map1, /** * Create tasks for regular import, no repl complexity + * @param tblDesc + * @param partitionDescs + * @param isPartSpecSet + * @param replicationSpec + * @param table + * @param fromURI + * @param fs + * @param wh */ - private void createRegularImportTasks( - List> rootTasks, + private static void createRegularImportTasks( CreateTableDesc tblDesc, List partitionDescs, boolean isPartSpecSet, ReplicationSpec replicationSpec, - Table table, URI fromURI, FileSystem fs, Warehouse wh) + Table table, URI fromURI, FileSystem fs, Warehouse wh, EximUtil.SemanticAnalyzerWrapperContext x) throws HiveException, URISyntaxException, IOException, MetaException { if (table != null){ if (table.isPartitioned()) { - LOG.debug("table partitioned"); + x.getLOG().debug("table partitioned"); for (AddPartitionDesc addPartitionDesc : partitionDescs) { Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; - if ((ptn = db.getPartition(table, partSpec, false)) == null) { - rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec)); + if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { + x.getTasks().add(addSinglePartition( + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); } else { throw new SemanticException( ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec))); @@ -707,35 +736,35 @@ private void createRegularImportTasks( } } else { - LOG.debug("table non-partitioned"); + x.getLOG().debug("table non-partitioned"); // ensure if destination is not empty only for regular import Path tgtPath = new Path(table.getDataLocation().toString()); - FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf); - checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec); - loadTable(fromURI, table, false, tgtPath); + FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); + checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x); + loadTable(fromURI, table, false, tgtPath, replicationSpec,x); } // Set this to read because we can't overwrite any existing partitions - outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); + x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); } else { - LOG.debug("table " + tblDesc.getTableName() + " does not exist"); + x.getLOG().debug("table " + tblDesc.getTableName() + " does not exist"); - Task t = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), tblDesc), conf); + Task t = TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), tblDesc), x.getConf()); table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName()); - Database parentDb = db.getDatabase(tblDesc.getDatabaseName()); + Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName()); // Since we are going to be creating a new table in a db, we should mark that db as a write entity // so that the auth framework can go to work there. - outputs.add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED)); + x.getOutputs().add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED)); if (isPartitioned(tblDesc)) { for (AddPartitionDesc addPartitionDesc : partitionDescs) { t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec)); + addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); } } else { - LOG.debug("adding dependent CopyWork/MoveWork for table"); + x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); if (tblDesc.isExternal() && (tblDesc.getLocation() == null)) { - LOG.debug("Importing in place, no emptiness check, no copying/loading"); + x.getLOG().debug("Importing in place, no emptiness check, no copying/loading"); Path dataPath = new Path(fromURI.toString(), "data"); tblDesc.setLocation(dataPath.toString()); } else { @@ -745,23 +774,24 @@ private void createRegularImportTasks( } else { tablePath = wh.getTablePath(parentDb, tblDesc.getTableName()); } - FileSystem tgtFs = FileSystem.get(tablePath.toUri(), conf); - checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec); - t.addDependentTask(loadTable(fromURI, table, false, tablePath)); + FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf()); + checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x); + t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x)); } } - rootTasks.add(t); + x.getTasks().add(t); } } /** * Create tasks for repl import */ - private void createReplImportTasks( - List> rootTasks, + private static void createReplImportTasks( CreateTableDesc tblDesc, List partitionDescs, - boolean isPartSpecSet, ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh) + boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnCreateDb, + Table table, URI fromURI, FileSystem fs, Warehouse wh, + EximUtil.SemanticAnalyzerWrapperContext x) throws HiveException, URISyntaxException, IOException, MetaException { Task dr = null; @@ -774,7 +804,7 @@ private void createReplImportTasks( // So, we check the repl.last.id of the destination, and if it's newer, we no-op. If it's older, we // drop and re-create. if (replicationSpec.allowReplacementInto(table)){ - dr = dropTableTask(table); + dr = dropTableTask(table, x); lockType = WriteEntity.WriteType.DDL_EXCLUSIVE; table = null; // null it out so we go into the table re-create flow. } else { @@ -782,12 +812,29 @@ private void createReplImportTasks( } } - Database parentDb = db.getDatabase(tblDesc.getDatabaseName()); + // Normally, on import, trying to create a table or a partition in a db that does not yet exist + // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying + // to create tasks to create a table inside a db that as-of-now does not exist, but there is + // a precursor Task waiting that will create it before this is encountered. Thus, we instantiate + // defaults and do not error out in that case. + Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName()); if (parentDb == null){ - throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName())); + if (!waitOnCreateDb){ + throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName())); + } } if (tblDesc.getLocation() == null) { - tblDesc.setLocation(wh.getTablePath(parentDb, tblDesc.getTableName()).toString()); + if (!waitOnCreateDb){ + tblDesc.setLocation(wh.getTablePath(parentDb, tblDesc.getTableName()).toString()); + } else { + tblDesc.setLocation( + wh.getDnsPath(new Path( + wh.getDefaultDatabasePath(tblDesc.getDatabaseName()), + MetaStoreUtils.encodeTableName(tblDesc.getTableName().toLowerCase()) + ) + ).toString()); + + } } /* Note: In the following section, Metadata-only import handling logic is @@ -807,49 +854,52 @@ private void createReplImportTasks( lockType = WriteEntity.WriteType.DDL_SHARED; } - Task t = createTableTask(tblDesc); + Task t = createTableTask(tblDesc, x); table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName()); if (!replicationSpec.isMetadataOnly()) { if (isPartitioned(tblDesc)) { for (AddPartitionDesc addPartitionDesc : partitionDescs) { t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec)); + addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); } } else { - LOG.debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()))); + x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); + t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()),replicationSpec, x)); } } if (dr == null){ // Simply create - rootTasks.add(t); + x.getTasks().add(t); } else { // Drop and recreate dr.addDependentTask(t); - rootTasks.add(dr); + x.getTasks().add(dr); } } else { // Table existed, and is okay to replicate into, not dropping and re-creating. if (table.isPartitioned()) { - LOG.debug("table partitioned"); + x.getLOG().debug("table partitioned"); for (AddPartitionDesc addPartitionDesc : partitionDescs) { Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; - if ((ptn = db.getPartition(table, partSpec, false)) == null) { + if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { if (!replicationSpec.isMetadataOnly()){ - rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec)); + x.getTasks().add(addSinglePartition( + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); } } else { // If replicating, then the partition already existing means we need to replace, maybe, if // the destination ptn's repl.last.id is older than the replacement's. if (replicationSpec.allowReplacementInto(ptn)){ if (!replicationSpec.isMetadataOnly()){ - rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec)); + x.getTasks().add(addSinglePartition( + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); } else { - rootTasks.add(alterSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn)); + x.getTasks().add(alterSinglePartition( + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); } if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; @@ -862,31 +912,31 @@ private void createReplImportTasks( } if (replicationSpec.isMetadataOnly() && partitionDescs.isEmpty()){ // MD-ONLY table alter - rootTasks.add(alterTableTask(tblDesc)); + x.getTasks().add(alterTableTask(tblDesc, x)); if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; } } } else { - LOG.debug("table non-partitioned"); + x.getLOG().debug("table non-partitioned"); if (!replicationSpec.allowReplacementInto(table)){ return; // silently return, table is newer than our replacement. } if (!replicationSpec.isMetadataOnly()) { - loadTable(fromURI, table, true, new Path(fromURI)); // repl-imports are replace-into + loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x); // repl-imports are replace-into } else { - rootTasks.add(alterTableTask(tblDesc)); + x.getTasks().add(alterTableTask(tblDesc, x)); } if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; } } } - outputs.add(new WriteEntity(table,lockType)); + x.getOutputs().add(new WriteEntity(table,lockType)); } - private boolean isPartitioned(CreateTableDesc tblDesc) { + private static boolean isPartitioned(CreateTableDesc tblDesc) { return !(tblDesc.getPartCols() == null || tblDesc.getPartCols().isEmpty()); } @@ -894,7 +944,7 @@ private boolean isPartitioned(CreateTableDesc tblDesc) { * Utility method that returns a table if one corresponding to the destination * tblDesc is found. Returns null if no such table is found. */ - private Table tableIfExists(CreateTableDesc tblDesc) throws HiveException { + private static Table tableIfExists(CreateTableDesc tblDesc, Hive db) throws HiveException { try { return db.getTable(tblDesc.getDatabaseName(),tblDesc.getTableName()); } catch (InvalidTableException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index a7005f1..6726d44 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -69,7 +69,7 @@ public LoadSemanticAnalyzer(QueryState queryState) throws SemanticException { @Override public boolean accept(Path p) { String name = p.getName(); - return name.equals("_metadata") ? true : !name.startsWith("_") && !name.startsWith("."); + return name.equals(EximUtil.METADATA_NAME) ? true : !name.startsWith("_") && !name.startsWith("."); } }); if ((srcs != null) && srcs.length == 1) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java index a17696a..db624c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hive.metastore.events.PreEventContext; import org.apache.hadoop.hive.metastore.events.PreEventContext.PreEventType; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; + /** * This class listens for drop events and, if set, exports the table's metadata as JSON to the trash * of the user performing the drop @@ -83,7 +83,7 @@ private void export_meta_data(PreDropTableEvent tableEvent) throws MetaException } catch (IOException e) { throw new MetaException(e.getMessage()); } - Path outFile = new Path(metaPath, name + ImportSemanticAnalyzer.METADATA_NAME); + Path outFile = new Path(metaPath, name + EximUtil.METADATA_NAME); try { SessionState.getConsole().printInfo("Beginning metadata export"); EximUtil.createExportDump(fs, outFile, mTbl, null, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java new file mode 100644 index 0000000..a4dfa3a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -0,0 +1,533 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.io.IOUtils; + +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.ql.parse.HiveParser.*; + +public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { + // Database name or pattern + private String dbNameOrPattern; + // Table name or pattern + private String tblNameOrPattern; + private Integer eventFrom; + private Integer eventTo; + private Integer batchSize; + // Base path for REPL LOAD + private String path; + + public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { + super(queryState); + } + + @Override + public void analyzeInternal(ASTNode ast) throws SemanticException { + LOG.debug("ReplicationSemanticAanalyzer: analyzeInternal"); + LOG.debug(ast.getName() + ":" + ast.getToken().getText() + "=" + ast.getText()); + switch (ast.getToken().getType()) { + case TOK_REPL_DUMP: { + LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump"); + initReplDump(ast); + analyzeReplDump(ast); + } + case TOK_REPL_LOAD: { + LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: load"); + initReplLoad(ast); + analyzeReplLoad(ast); + } + case TOK_REPL_STATUS: { + LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: status"); + initReplStatus(ast); + analyzeReplStatus(ast); + } + default: { + throw new SemanticException("Unexpected root token"); + } + } + } + + private void initReplDump(ASTNode ast) { + int numChildren = ast.getChildCount(); + dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); + // skip the first node, which is always required + int currNode = 1; + while (currNode < numChildren) { + if (ast.getChild(currNode).getType() != TOK_FROM) { + // optional tblName was specified. + tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(currNode).getText()); + } else { + // TOK_FROM subtree + Tree fromNode = ast.getChild(currNode); + eventFrom = Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(0).getText())); + // skip the first, which is always required + int numChild = 1; + while (numChild < fromNode.getChildCount()) { + if (fromNode.getChild(numChild).getType() == TOK_TO) { + eventTo = + Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); + // skip the next child, since we already took care of it + numChild++; + } else if (fromNode.getChild(numChild).getType() == TOK_BATCH) { + batchSize = + Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); + // skip the next child, since we already took care of it + numChild++; + } + // move to the next child in FROM tree + numChild++; + } + // FROM node is always the last + break; + } + // move to the next root node + currNode++; + } + } + + // REPL DUMP + private void analyzeReplDump(ASTNode ast) throws SemanticException { + // FIXME: support non-bootstrap: use eventFrom/eventTo/batchSize + LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern) + + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to " + + String.valueOf(eventTo) + " batchsize " + String.valueOf(batchSize)); + String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR); + Path dumpRoot = new Path(replRoot, getNextDumpDir()); + try { + for (String dbName : matchesDb(dbNameOrPattern)) { + LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); + Path dbRoot = dumpDbMetadata(dbName, dumpRoot); + for (String tblName : matchesTbl(dbName, tblNameOrPattern)) { + LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName + + " to db root " + dbRoot.toUri()); + dumpTbl(ast, dbName, tblName, dbRoot); + } + } + String currentReplId = + String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId()); + prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId), + "dump_dir,last_repl_id#string,string"); + } catch (Exception e) { + // TODO : simple wrap & rethrow for now, clean up with error codes + throw new SemanticException(e); + } + } + + String getNextDumpDir() { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + return "next"; + // make it easy to write unit tests, instead of unique id generation. + // however, this does mean that in writing tests, we have to be aware that + // repl dump will clash with prior dumps, and thus have to clean up properly. + } else { + return String.valueOf(System.currentTimeMillis()); + // TODO: time good enough for now - we'll likely improve this. + // We may also work in something the equivalent of pid, thrid and move to nanos to ensure + // uniqueness. + } + } + + /** + * + * @param dbName + * @param dumpRoot + * @return db dumped path + * @throws SemanticException + */ + private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticException { + Path dbRoot = new Path(dumpRoot, dbName); + try { + // TODO : instantiating FS objects are generally costly. Refactor + FileSystem fs = dbRoot.getFileSystem(conf); + Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); + Database dbObj = db.getDatabase(dbName); + EximUtil.createDbExportDump(fs, dumpPath, dbObj, getNewReplicationSpec()); + } catch (Exception e) { + // TODO : simple wrap & rethrow for now, clean up with error codes + throw new SemanticException(e); + } + return dbRoot; + } + + /** + * + * @param ast + * @param dbName + * @param tblName + * @param dbRoot + * @return tbl dumped path + * @throws SemanticException + */ + private Path dumpTbl(ASTNode ast, String dbName, String tblName, Path dbRoot) throws SemanticException { + Path tableRoot = new Path(dbRoot, tblName); + try { + URI toURI = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString()); + TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null); + ExportSemanticAnalyzer.prepareExport(ast, toURI, ts, getNewReplicationSpec(), db, conf, ctx, + rootTasks, inputs, outputs, LOG); + } catch (HiveException e) { + // TODO : simple wrap & rethrow for now, clean up with error codes + throw new SemanticException(e); + } + return tableRoot; + } + + // REPL LOAD + private void initReplLoad(ASTNode ast) { + int numChildren = ast.getChildCount(); + path = PlanUtils.stripQuotes(ast.getChild(0).getText()); + if (numChildren > 1) { + dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(1).getText()); + } + if (numChildren > 2) { + tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(2).getText()); + } + } + + /* + * Example dump dirs we need to be able to handle : + * + * for: hive.repl.rootdir = staging/ Then, repl dumps will be created in staging/ + * + * single-db-dump: staging/blah12345 blah12345/ default/ _metadata tbl1/ _metadata dt=20160907/ + * _files tbl2/ tbl3/ unptn_tbl/ _metadata _files + * + * multi-db-dump: staging/bar12347 staging/ bar12347/ default/ ... sales/ ... + * + * single table-dump: staging/baz123 staging/ baz123/ _metadata dt=20150931/ _files + */ + private void analyzeReplLoad(ASTNode ast) throws SemanticException { + LOG.debug("ReplSemanticAnalyzer.analyzeReplLoad: " + String.valueOf(dbNameOrPattern) + "." + + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(path)); + + // for analyze repl load, we walk through the dir structure available in the path, + // looking at each db, and then each table, and then setting up the appropriate + // import job in its place. + + // FIXME : handle non-bootstrap cases. + + // We look at the path, and go through each subdir. + // Each subdir corresponds to a database. + // For each subdir, there is a _metadata file which allows us to re-impress the db object + // After each db object is loaded appropriately, iterate through the sub-table dirs, and pretend + // that we had an IMPORT on each of them, into this db. + + try { + + Path loadPath = new Path(path); + final FileSystem fs = loadPath.getFileSystem(conf); + + if (!fs.exists(loadPath)) { + // supposed dump path does not exist. + throw new FileNotFoundException(loadPath.toUri().toString()); + } + + // Now, the dumped path can be one of two things: + // a) It can be a db dump, in which case we expect a set of dirs, each with a + // db name, and with a _metadata file in each, and table dirs inside that. + // b) It can be a table dump dir, in which case we expect a _metadata dump of + // a table in question in the dir, and individual ptn dir hierarchy. + // Once we expand this into doing incremental repl, we can have individual events which can + // be other things like roles and fns as well. Also, if tblname is specified, we're guaranteed + // that this is a tbl-level dump, and it is an error condition if we find anything else. Also, + // if dbname is specified, we expect exactly one db dumped, and having more is an error + // condition. + + if ((tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { + analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null); + return; + } + + FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath); + if (srcs == null || (srcs.length == 0)) { + throw new FileNotFoundException(loadPath.toUri().toString()); + } + + FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs)); + + if ((dirsInLoadPath == null) || (dirsInLoadPath.length == 0)) { + throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString()); + } + + if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) { + LOG.debug("Found multiple dirs when we expected 1:"); + for (FileStatus d : dirsInLoadPath) { + LOG.debug("> " + d.getPath().toUri().toString()); + } + throw new IllegalArgumentException( + "Multiple dirs in " + + loadPath.toUri().toString() + + " does not correspond to REPL LOAD expecting to load to a singular destination point."); + } + + for (FileStatus dir : dirsInLoadPath) { + analyzeDatabaseLoad(dbNameOrPattern, fs, dir); + } + + } catch (Exception e) { + // TODO : simple wrap & rethrow for now, clean up with error codes + throw new SemanticException(e); + } + + } + + private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) + throws SemanticException { + try { + // Path being passed to us is a db dump location. We go ahead and load as needed. + // dbName might be null or empty, in which case we keep the original db name for the new + // database creation + + // Two steps here - first, we read the _metadata file here, and create a CreateDatabaseDesc + // associated with that + // Then, we iterate over all subdirs, and create table imports for each. + + EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData(); + try { + rv = EximUtil.readMetaData(fs, new Path(dir.getPath(), EximUtil.METADATA_NAME)); + } catch (IOException e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + + Database dbObj = rv.getDatabase(); + + if (dbObj == null) { + throw new IllegalArgumentException( + "_metadata file read did not contain a db object - invalid dump."); + } + + if ((dbName == null) || (dbName.isEmpty())) { + // We use dbName specified as long as it is not null/empty. If so, then we use the original + // name + // recorded in the thrift object. + dbName = dbObj.getName(); + } + + CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); + createDbDesc.setName(dbName); + createDbDesc.setComment(dbObj.getDescription()); + createDbDesc.setDatabaseProperties(dbObj.getParameters()); + // note that we do not set location - for repl load, we want that auto-created. + + createDbDesc.setIfNotExists(false); + // If it exists, we want this to be an error condition. Repl Load is not intended to replace a + // db. + // TODO: we might revisit this in create-drop-recreate cases, needs some thinking on. + Task createDbTask = TaskFactory.get(new DDLWork(inputs, outputs, createDbDesc), conf); + rootTasks.add(createDbTask); + + FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs)); + + for (FileStatus tableDir : dirsInDbPath) { + analyzeTableLoad(dbName, null, tableDir.getPath().toUri().toString(), createDbTask); + } + } catch (Exception e) { + throw new SemanticException(e); + } + } + + private void analyzeTableLoad(String dbName, String tblName, String locn, + Task precursor) throws SemanticException { + // Path being passed to us is a table dump location. We go ahead and load it in as needed. + // If tblName is null, then we default to the table name specified in _metadata, which is good. + // or are both specified, in which case, that's what we are intended to create the new table as. + if (dbName == null || dbName.isEmpty()) { + throw new SemanticException("Database name cannot be null for a table load"); + } + try { + // no location set on repl loads + boolean isLocationSet = false; + // all repl imports are non-external + boolean isExternalSet = false; + // bootstrap loads are not partition level + boolean isPartSpecSet = false; + // repl loads are not partition level + LinkedHashMap parsedPartSpec = null; + // no location for repl imports + String parsedLocation = null; + boolean waitOnCreateDb = false; + List> importTasks = null; + if (precursor == null) { + importTasks = rootTasks; + waitOnCreateDb = false; + } else { + importTasks = new ArrayList>(); + waitOnCreateDb = true; + } + EximUtil.SemanticAnalyzerWrapperContext x = + new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, importTasks, LOG, + ctx); + ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet, + waitOnCreateDb, parsedLocation, tblName, dbName, parsedPartSpec, locn, x); + + if (precursor != null) { + for (Task t : importTasks) { + precursor.addDependentTask(t); + } + } + + } catch (Exception e) { + throw new SemanticException(e); + } + } + + // REPL STATUS + private void initReplStatus(ASTNode ast) { + int numChildren = ast.getChildCount(); + dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); + if (numChildren > 1) { + tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(1).getText()); + } + } + + private void analyzeReplStatus(ASTNode ast) throws SemanticException { + LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: " + String.valueOf(dbNameOrPattern) + + "." + String.valueOf(tblNameOrPattern)); + + String replLastId = null; + + try { + if (tblNameOrPattern != null) { + // Checking for status of table + Table tbl = db.getTable(dbNameOrPattern, tblNameOrPattern); + if (tbl != null) { + inputs.add(new ReadEntity(tbl)); + Map params = tbl.getParameters(); + if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) { + replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID); + } + } + } else { + // Checking for status of a db + Database database = db.getDatabase(dbNameOrPattern); + if (database != null) { + inputs.add(new ReadEntity(database)); + Map params = database.getParameters(); + if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) { + replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID); + } + } + } + } catch (HiveException e) { + throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error + // codes + } + + LOG.debug("RSTATUS: writing repl.last.id=" + String.valueOf(replLastId) + " out to " + + ctx.getResFile()); + prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string"); + } + + private void prepareReturnValues(List values, String schema) throws SemanticException { + LOG.debug("prepareReturnValues : " + schema); + for (String s : values) { + LOG.debug(" > " + s); + } + + ctx.setResFile(ctx.getLocalTmpPath()); + // FIXME : this should not accessible by the user if we write to it from the frontend. + // Thus, we should Desc/Work this, otherwise there is a security issue here. + // Note: if we don't call ctx.setResFile, we get a NPE from the following code section + // If we do call it, then FetchWork thinks that the "table" here winds up thinking that + // this is a partitioned dir, which does not work. Thus, this does not work. + + writeOutput(values); + } + + private void writeOutput(List values) throws SemanticException { + Path outputFile = ctx.getResFile(); + FileSystem fs = null; + DataOutputStream outStream = null; + try { + fs = outputFile.getFileSystem(conf); + outStream = fs.create(outputFile); + outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); + for (int i = 1; i < values.size(); i++) { + outStream.write(Utilities.ctrlaCode); + outStream.writeBytes((values.get(1) == null ? Utilities.nullStringOutput : values.get(1))); + } + outStream.write(Utilities.newLineCode); + } catch (IOException e) { + throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error + // codes + } finally { + IOUtils.closeStream(outStream); // TODO : we have other closes here, and in ReplCopyTask - + // replace with this + } + } + + private ReplicationSpec getNewReplicationSpec() throws SemanticException { + try { + ReplicationSpec replicationSpec = + new ReplicationSpec(true, false, "replv2", "will-be-set", false, true); + replicationSpec.setCurrentReplicationState(String.valueOf(db.getMSC() + .getCurrentNotificationEventId().getEventId())); + return replicationSpec; + } catch (Exception e) { + throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error + // codes + } + } + + private Iterable matchesTbl(String dbName, String tblPattern) + throws HiveException { + if (tblPattern == null) { + return db.getAllTables(dbName); + } else { + return db.getTablesByPattern(dbName, tblPattern); + } + } + + private Iterable matchesDb(String dbPattern) throws HiveException { + if (dbPattern == null) { + return db.getAllDatabases(); + } else { + return db.getDatabasesByPattern(dbPattern); + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 4668271..824cf11 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -42,6 +42,7 @@ private String eventId = null; private String currStateId = null; private boolean isNoop = false; + private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in. // Key definitions related to replication @@ -49,8 +50,9 @@ REPL_SCOPE("repl.scope"), EVENT_ID("repl.event.id"), CURR_STATE_ID("repl.last.id"), - NOOP("repl.noop"); - + NOOP("repl.noop"), + LAZY("repl.lazy"), + ; private final String keyName; KEY(String s) { @@ -102,32 +104,32 @@ public ReplicationSpec(){ this((ASTNode)null); } - public ReplicationSpec( - boolean isInReplicationScope, boolean isMetadataOnly, String eventReplicationState, - String currentReplicationState, boolean isNoop){ + public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, + String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isLazy) { this.isInReplicationScope = isInReplicationScope; this.isMetadataOnly = isMetadataOnly; this.eventId = eventReplicationState; this.currStateId = currentReplicationState; this.isNoop = isNoop; + this.isLazy = isLazy; } public ReplicationSpec(Function keyFetcher) { String scope = keyFetcher.apply(ReplicationSpec.KEY.REPL_SCOPE.toString()); this.isMetadataOnly = false; this.isInReplicationScope = false; - if (scope != null){ - if (scope.equalsIgnoreCase("metadata")){ + if (scope != null) { + if (scope.equalsIgnoreCase("metadata")) { this.isMetadataOnly = true; this.isInReplicationScope = true; - } else if (scope.equalsIgnoreCase("all")){ + } else if (scope.equalsIgnoreCase("all")) { this.isInReplicationScope = true; } } this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString()); this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString()); - this.isNoop = Boolean.parseBoolean( - keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString())); + this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString())); + this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString())); } /** @@ -280,6 +282,21 @@ public void setNoop(boolean isNoop) { this.isNoop = isNoop; } + /** + * @return whether or not the current replication action is set to be lazy + */ + public boolean isLazy() { + return isLazy; + } + + /** + * @param isLazy whether or not the current replication action should be lazy + */ + public void setLazy(boolean isLazy){ + this.isLazy = isLazy; + } + + public String get(KEY key) { switch (key){ case REPL_SCOPE: @@ -297,6 +314,8 @@ public String get(KEY key) { return getCurrentReplicationState(); case NOOP: return String.valueOf(isNoop()); + case LAZY: + return String.valueOf(isLazy()); } return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 4f0ead0..afc4b92 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -128,6 +128,10 @@ commandType.put(HiveParser.TOK_COMMIT, HiveOperation.COMMIT); commandType.put(HiveParser.TOK_ROLLBACK, HiveOperation.ROLLBACK); commandType.put(HiveParser.TOK_SET_AUTOCOMMIT, HiveOperation.SET_AUTOCOMMIT); + commandType.put(HiveParser.TOK_REPL_DUMP, HiveOperation.EXPORT); // piggyback on EXPORT security handling for now + commandType.put(HiveParser.TOK_REPL_LOAD, HiveOperation.IMPORT); // piggyback on IMPORT security handling for now + commandType.put(HiveParser.TOK_REPL_STATUS, HiveOperation.SHOW_TBLPROPERTIES); // TODO : also actually DESCDATABASE + } static { @@ -185,6 +189,12 @@ public static BaseSemanticAnalyzer get(QueryState queryState, ASTNode tree) return new ExportSemanticAnalyzer(queryState); case HiveParser.TOK_IMPORT: return new ImportSemanticAnalyzer(queryState); + case HiveParser.TOK_REPL_DUMP: + return new ReplicationSemanticAnalyzer(queryState); + case HiveParser.TOK_REPL_LOAD: + return new ReplicationSemanticAnalyzer(queryState); + case HiveParser.TOK_REPL_STATUS: + return new ReplicationSemanticAnalyzer(queryState); case HiveParser.TOK_ALTERTABLE: { Tree child = tree.getChild(1); switch (child.getType()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java new file mode 100644 index 0000000..1932d60 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + +/** + * Marker work for Replication - behaves similar to CopyWork, but maps to ReplCopyTask, + * which will have mechanics to list the files in source to write to the destination, + * instead of copying them, if specified, falling back to copying if needed. + */ +@Explain(displayName = "Copy for Replication", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +public class ReplCopyWork extends CopyWork { + + protected boolean copyFiles = true; // governs copy-or-list-files behaviour. + // If set to true, behaves identically to a CopyWork + // If set to false, ReplCopyTask does a file-list of the things to be copied instead, and puts them in a file called _files. + // Default is set to mimic CopyTask, with the intent that any Replication code will explicitly flip this. + + /** + * TODO : Refactor + * + * There is an upcoming patch that refactors this bit of code. Currently, the idea is the following: + * + * By default, ReplCopyWork will behave similarly to CopyWork, and simply copy + * along data from the source to destination. If, however, listFilesOnOutput is set, + * then, instead of copying the individual files to the destination, it simply creates + * a file called _files on destination that contains the list of the original files + * that were intended to be copied. Thus, we do not actually copy the files at CopyWork + * time. + * + * The flip side of this behaviour happens when, instead, readListFromInput is set. This + * flag, if set, changes the source behaviour of this CopyTask, and instead of copying + * explicit files, this will then fall back to a behaviour wherein an _files is read from + * the source, and the files specified by the _files are then copied to the destination. + * + * This allows us a lazy-copy-on-source and a pull-from destination semantic that we want + * to use from replication. + * + * == + * + * The refactor intent, however, is to simplify this, so that we have only 1 flag that we set, + * called isLazy. If isLazy is set, then this is the equivalent of the current listFilesOnOutput, + * and will generate a _files file. + * + * As to the input, we simply decide on whether to use the lazy mode or not depending on the + * presence of a _files file on the input. If we see a _files on the input, we simply expand it + * to copy as needed. If we do not, we copy as normal. + * + */ + + protected boolean listFilesOnOutput = false; // governs copy-or-list-files behaviour + // If set to true, it'll iterate over input files, and for each file in the input, + // it'll write out an additional line in a _files file in the output. + // If set to false, it'll behave as a traditional CopyTask. + + protected boolean readListFromInput = false; // governs remote-fetch-input behaviour + // If set to true, we'll assume that the input has a _files file present which lists + // the actual input files to copy, and we'll pull each of those on read. + // If set to false, it'll behave as a traditional CopyTask. + + public ReplCopyWork() { + } + + public ReplCopyWork(final Path fromPath, final Path toPath) { + super(fromPath, toPath, true); + } + + public ReplCopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) { + super(fromPath, toPath, errorOnSrcEmpty); + } + + public void setListFilesOnOutputBehaviour(boolean listFilesOnOutput){ + this.listFilesOnOutput = listFilesOnOutput; + } + + public boolean getListFilesOnOutputBehaviour(){ + return this.listFilesOnOutput; + } + + public void setReadListFromInput(boolean readListFromInput){ + this.readListFromInput = readListFromInput; + } + + public boolean getReadListFromInput(){ + return this.readListFromInput; + } + + // specialization of getListFilesOnOutputBehaviour, with a filestatus arg + // we can default to the default getListFilesOnOutputBehaviour behaviour, + // or, we can do additional pattern matching to decide that certain files + // should not be listed, and copied instead, _metadata files, for instance. + // Currently, we use this to skip _metadata files, but we might decide that + // this is not the right place for it later on. + public boolean getListFilesOnOutputBehaviour(FileStatus f) { + if (f.getPath().toString().contains("_metadata")){ + return false; // always copy _metadata files + } + return this.listFilesOnOutput; + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java new file mode 100644 index 0000000..54b2bd1 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import static org.junit.Assert.*; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestReplicationSemanticAnalyzer { + static QueryState queryState; + static HiveConf conf; + static String defaultDB = "default"; + static String tblName = "testReplSA"; + static ArrayList cols = new ArrayList(Arrays.asList("col1", "col2")); + ParseDriver pd; + SemanticAnalyzer sA; + + + @BeforeClass + public static void initialize() throws HiveException { + queryState = new QueryState(new HiveConf(SemanticAnalyzer.class)); + conf = queryState.getConf(); + conf.set("hive.security.authorization.manager", ""); + SessionState.start(conf); + Hive hiveDb = Hive.get(conf); + hiveDb.createTable(defaultDB + "." + tblName, cols, null, OrcInputFormat.class, OrcOutputFormat.class); + Table t = hiveDb.getTable(tblName); + } + + @AfterClass + public static void teardown() throws HiveException { + } + + @Test + public void testReplDumpParse() throws Exception { + ParseDriver pd = new ParseDriver(); + String fromEventId = "100"; + String toEventId = "200"; + String batchSize = "50"; + ASTNode root; + ASTNode child; + + String query = "repl dump " + defaultDB; + root = (ASTNode) pd.parse(query).getChild(0); + assertEquals(root.getText(), "TOK_REPL_DUMP"); + assertEquals(root.getChildCount(), 1); + + child = (ASTNode) root.getChild(0); + assertEquals(child.getText(), defaultDB); + assertEquals(child.getChildCount(), 0); + + query = "repl dump " + defaultDB + "." + tblName; + root = (ASTNode) pd.parse(query).getChild(0); + assertEquals(root.getChildCount(), 2); + + child = (ASTNode) root.getChild(0); + assertEquals(child.getText(), defaultDB); + assertEquals(child.getChildCount(), 0); + + child = (ASTNode) root.getChild(1); + assertEquals(child.getText(), tblName); + assertEquals(child.getChildCount(), 0); + + query = "repl dump " + defaultDB + "." + tblName + " from " + fromEventId; + root = (ASTNode) pd.parse(query).getChild(0); + assertEquals(root.getChildCount(), 3); + + child = (ASTNode) root.getChild(0); + assertEquals(child.getText(), defaultDB); + assertEquals(child.getChildCount(), 0); + + child = (ASTNode) root.getChild(1); + assertEquals(child.getText(), tblName); + assertEquals(child.getChildCount(), 0); + + root = (ASTNode) root.getChild(2); + assertEquals(root.getText(), "TOK_FROM"); + assertEquals(root.getChildCount(), 1); + + child = (ASTNode) root.getChild(0); + assertEquals(child.getText(), fromEventId); + assertEquals(child.getChildCount(), 0); + + query = "repl dump " + defaultDB + "." + tblName + " from " + fromEventId + " to " + toEventId; + + root = (ASTNode) pd.parse(query).getChild(0); + assertEquals(root.getChildCount(), 3); + + child = (ASTNode) root.getChild(0); + assertEquals(child.getText(), defaultDB); + assertEquals(child.getChildCount(), 0); + + child = (ASTNode) root.getChild(1); + assertEquals(child.getText(), tblName); + assertEquals(child.getChildCount(), 0); + + root = (ASTNode) root.getChild(2); + assertEquals(root.getText(), "TOK_FROM"); + assertEquals(root.getChildCount(), 3); + + child = (ASTNode) root.getChild(0); + assertEquals(child.getText(), fromEventId); + assertEquals(child.getChildCount(), 0); + + child = (ASTNode) root.getChild(1); + assertEquals(child.getText(), "TOK_TO"); + assertEquals(child.getChildCount(), 0); + + child = (ASTNode) root.getChild(2); + assertEquals(child.getText(), toEventId); + assertEquals(child.getChildCount(), 0); + + query = + "repl dump " + defaultDB + "." + tblName + " from " + fromEventId + " to " + toEventId + + " batch " + batchSize; + + root = (ASTNode) pd.parse(query).getChild(0); + assertEquals(root.getChildCount(), 3); + + child = (ASTNode) root.getChild(0); + assertEquals(child.getText(), defaultDB); + assertEquals(child.getChildCount(), 0); + + child = (ASTNode) root.getChild(1); + assertEquals(child.getText(), tblName); + assertEquals(child.getChildCount(), 0); + + root = (ASTNode) root.getChild(2); + assertEquals(root.getText(), "TOK_FROM"); + assertEquals(root.getChildCount(), 5); + + child = (ASTNode) root.getChild(0); + assertEquals(child.getText(), fromEventId); + assertEquals(child.getChildCount(), 0); + + child = (ASTNode) root.getChild(1); + assertEquals(child.getText(), "TOK_TO"); + assertEquals(child.getChildCount(), 0); + + child = (ASTNode) root.getChild(2); + assertEquals(child.getText(), toEventId); + assertEquals(child.getChildCount(), 0); + + child = (ASTNode) root.getChild(3); + assertEquals(child.getText(), "TOK_BATCH"); + assertEquals(child.getChildCount(), 0); + + child = (ASTNode) root.getChild(4); + assertEquals(child.getText(), batchSize); + assertEquals(child.getChildCount(), 0); + } + + @Test + public void testReplLoadParse() throws Exception { + // FileSystem fs = FileSystem.get(conf); + ParseDriver pd = new ParseDriver(); + ASTNode root; + ASTNode child; + String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR); + Path dumpRoot = new Path(replRoot, "next"); + System.out.println(replRoot); + System.out.println(dumpRoot); + String newDB = "default_bak"; + + String query = "repl load from '" + dumpRoot.toString() + "'"; + root = (ASTNode) pd.parse(query).getChild(0); + assertEquals(root.getText(), "TOK_REPL_LOAD"); + assertEquals(root.getChildCount(), 1); + child = (ASTNode) root.getChild(0); + assertEquals(child.getText(), "'" + dumpRoot.toString() + "'"); + assertEquals(child.getChildCount(), 0); + + query = "repl load " + newDB + " from '" + dumpRoot.toString() + "'"; + root = (ASTNode) pd.parse(query).getChild(0); + assertEquals(root.getText(), "TOK_REPL_LOAD"); + assertEquals(root.getChildCount(), 2); + child = (ASTNode) root.getChild(0); + assertEquals(child.getText(), "'" + dumpRoot.toString() + "'"); + assertEquals(child.getChildCount(), 0); + child = (ASTNode) root.getChild(1); + assertEquals(child.getText(), newDB); + assertEquals(child.getChildCount(), 0); + } + + // TODO: add this test after repl dump analyze generates tasks + //@Test + public void testReplDumpAnalyze() throws Exception { + + } + + //@Test + public void testReplLoadAnalyze() throws Exception { + ParseDriver pd = new ParseDriver(); + ASTNode root; + String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR); + FileSystem fs = FileSystem.get(conf); + Path dumpRoot = new Path(replRoot, "next"); + System.out.println(replRoot); + System.out.println(dumpRoot); + String newDB = "default_bak"; + + // First create a dump + String query = "repl dump " + defaultDB; + root = (ASTNode) pd.parse(query).getChild(0); + ReplicationSemanticAnalyzer rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root); + rs.analyze(root, new Context(conf)); + + // Then analyze load + query = "repl load from '" + dumpRoot.toString() + "'"; + root = (ASTNode) pd.parse(query).getChild(0); + rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root); + rs.analyze(root, new Context(conf)); + List> roots = rs.getRootTasks(); + assertEquals(1, roots.size()); + + query = "repl load " + newDB + " from '" + dumpRoot.toString() + "'"; + root = (ASTNode) pd.parse(query).getChild(0); + rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root); + rs.analyze(root, new Context(conf)); + roots = rs.getRootTasks(); + assertEquals(1, roots.size()); + } +}