diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 4fa45ae194..fb592acc94 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; @@ -159,7 +160,7 @@ public void tearDown(){ private static int next = 0; private synchronized void advanceDumpDir() { next++; - ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next)); + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); } static class Tuple { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index d6b97e81ce..1128eae6f9 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -32,6 +32,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; @@ -131,7 +132,7 @@ private Path mkDir(DistributedFileSystem fs, String pathString) private void advanceDumpDir() { next++; - ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next)); + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); } private ArrayList lastResults; diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift index cc9af94dc7..dc55805221 100644 --- a/ql/if/queryplan.thrift +++ b/ql/if/queryplan.thrift @@ -100,6 +100,7 @@ enum StageType { STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, + REPLDUMP, } struct Stage { diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index e92d77696b..7254d504fe 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -115,7 +115,8 @@ int _kStageTypeValues[] = { StageType::MOVE, StageType::STATS, StageType::DEPENDENCY_COLLECTION, - StageType::COLUMNSTATS + StageType::COLUMNSTATS, + StageType::REPLDUMP }; const char* _kStageTypeNames[] = { "CONDITIONAL", @@ -129,9 +130,10 @@ const char* _kStageTypeNames[] = { "MOVE", "STATS", "DEPENDENCY_COLLECTION", - "COLUMNSTATS" + "COLUMNSTATS", + "REPLDUMP" }; -const std::map _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(12, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(13, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); Adjacency::~Adjacency() throw() { diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h index ce37b2ecac..38d054b119 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -93,7 +93,8 @@ struct StageType { MOVE = 8, STATS = 9, DEPENDENCY_COLLECTION = 10, - COLUMNSTATS = 11 + COLUMNSTATS = 11, + REPLDUMP = 12 }; }; diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index f20174cbe1..deca5745d8 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -23,7 +23,8 @@ MOVE(8), STATS(9), DEPENDENCY_COLLECTION(10), - COLUMNSTATS(11); + COLUMNSTATS(11), + REPLDUMP(12); private final int value; @@ -68,6 +69,8 @@ public static StageType findByValue(int value) { return DEPENDENCY_COLLECTION; case 11: return COLUMNSTATS; + case 12: + return REPLDUMP; default: return null; } diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php index e1693f33b4..4d902ee8a1 100644 --- a/ql/src/gen/thrift/gen-php/Types.php +++ b/ql/src/gen/thrift/gen-php/Types.php @@ -114,6 +114,7 @@ final class StageType { const STATS = 9; const DEPENDENCY_COLLECTION = 10; const COLUMNSTATS = 11; + const REPLDUMP = 12; static public $__names = array( 0 => 'CONDITIONAL', 1 => 'COPY', @@ -127,6 +128,7 @@ final class StageType { 9 => 'STATS', 10 => 'DEPENDENCY_COLLECTION', 11 => 'COLUMNSTATS', + 12 => 'REPLDUMP', ); } diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 207395913f..9e29129896 100644 --- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -160,6 +160,7 @@ class StageType: STATS = 9 DEPENDENCY_COLLECTION = 10 COLUMNSTATS = 11 + REPLDUMP = 12 _VALUES_TO_NAMES = { 0: "CONDITIONAL", @@ -174,6 +175,7 @@ class StageType: 9: "STATS", 10: "DEPENDENCY_COLLECTION", 11: "COLUMNSTATS", + 12: "REPLDUMP", } _NAMES_TO_VALUES = { @@ -189,6 +191,7 @@ class StageType: "STATS": 9, "DEPENDENCY_COLLECTION": 10, "COLUMNSTATS": 11, + "REPLDUMP": 12, } diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb index f8b40344cb..1433d4a862 100644 --- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -72,8 +72,9 @@ module StageType STATS = 9 DEPENDENCY_COLLECTION = 10 COLUMNSTATS = 11 - VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS"} - VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS]).freeze + REPLDUMP = 12 + VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPLDUMP"} + VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPLDUMP]).freeze end class Adjacency diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index d61a4607ea..94d6c5a327 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask; @@ -108,7 +110,7 @@ public TaskTuple(Class workClass, Class> taskClass) { IndexMetadataChangeTask.class)); taskvec.add(new TaskTuple(TezWork.class, TezTask.class)); taskvec.add(new TaskTuple(SparkWork.class, SparkTask.class)); - + taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class)); } private static ThreadLocal tid = new ThreadLocal() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java new file mode 100644 index 0000000000..f9bdff8381 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -0,0 +1,347 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import com.google.common.collect.Collections2; +import com.google.common.primitives.Ints; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.EventUtils; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; +import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; +import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; +import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.InvalidTableException; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; +import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; +import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +public class ReplDumpTask extends Task implements Serializable { + private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; + private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; + private static final String FUNCTION_METADATA_DIR_NAME = "_metadata"; + + private final static String TMP_TABLE_PREFIX = + SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase(); + + private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class); + private Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); + + @Override + public String getName() { + return "REPL_DUMP"; + } + + @Override + protected int execute(DriverContext driverContext) { + try { + Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), getNextDumpDir()); + DumpMetaData dmd = new DumpMetaData(dumpRoot, conf); + Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); + Long lastReplId; + if (work.isBootStrapDump()) { + lastReplId = bootStrapDump(dumpRoot, dmd, cmRoot); + } else { + lastReplId = incrementalDump(dumpRoot, dmd, cmRoot); + } + prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema); + } catch (Exception e) { + LOG.error("failed", e); + return 1; + } + return 0; + } + + private void prepareReturnValues(List values, String schema) throws SemanticException { + LOG.debug("prepareReturnValues : " + schema); + for (String s : values) { + LOG.debug(" > " + s); + } + Utils.writeOutput(values, new Path(work.resultTempPath), conf); + } + + private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { + Long lastReplId;// get list of events matching dbPattern & tblPattern + // go through each event, and dump out each event to a event-level dump dir inside dumproot + + // TODO : instead of simply restricting by message format, we should eventually + // move to a jdbc-driver-stype registering of message format, and picking message + // factory per event to decode. For now, however, since all messages have the + // same factory, restricting by message format is effectively a guard against + // older leftover data that would cause us problems. + + work.overrideEventTo(getHive()); + + IMetaStoreClient.NotificationFilter evFilter = new AndFilter( + new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern), + new EventBoundaryFilter(work.eventFrom, work.eventTo), + new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat())); + + EventUtils.MSClientNotificationFetcher evFetcher + = new EventUtils.MSClientNotificationFetcher(getHive().getMSC()); + + EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( + evFetcher, work.eventFrom, work.maxEventLimit(), evFilter); + + lastReplId = work.eventTo; + String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty()) + ? work.dbNameOrPattern + : "?"; + REPL_STATE_LOG + .info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: INCREMENTAL", dbName); + while (evIter.hasNext()) { + NotificationEvent ev = evIter.next(); + lastReplId = ev.getEventId(); + Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); + dumpEvent(ev, evRoot, cmRoot); + } + + REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}", dbName); + + LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); + Utils.writeOutput( + Arrays.asList( + "incremental", + String.valueOf(work.eventFrom), + String.valueOf(lastReplId) + ), + dmd.getDumpFilePath(), conf); + dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot); + dmd.write(); + return lastReplId; + } + + private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception { + EventHandler.Context context = new EventHandler.Context( + evRoot, + cmRoot, + getHive(), + conf, + getNewEventOnlyReplicationSpec(ev.getEventId()) + ); + EventHandlerFactory.handlerFor(ev).handle(context); + REPL_STATE_LOG.info( + "Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and data to path {}", + String.valueOf(ev.getEventId()), ev.getEventType(), evRoot.toUri().toString()); + } + + private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException { + return getNewReplicationSpec(eventId.toString(), eventId.toString()); + } + + private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { + // bootstrap case + Long bootDumpBeginReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId(); + for (String dbName : matchesDb()) { + REPL_STATE_LOG + .info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP", + dbName); + LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); + + Path dbRoot = dumpDbMetadata(dbName, dumpRoot); + dumpFunctionMetadata(dbName, dumpRoot); + for (String tblName : matchesTbl(dbName, work.tableNameOrPattern)) { + LOG.debug( + "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); + dumpTable(dbName, tblName, dbRoot); + } + } + Long bootDumpEndReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId(); + LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, + bootDumpEndReplId); + + // Now that bootstrap has dumped all objects related, we have to account for the changes + // that occurred while bootstrap was happening - i.e. we have to look through all events + // during the bootstrap period and consolidate them with our dump. + + IMetaStoreClient.NotificationFilter evFilter = + new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern); + EventUtils.MSClientNotificationFetcher evFetcher = + new EventUtils.MSClientNotificationFetcher(getHive().getMSC()); + EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( + evFetcher, bootDumpBeginReplId, + Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1, + evFilter); + + // Now we consolidate all the events that happenned during the objdump into the objdump + while (evIter.hasNext()) { + NotificationEvent ev = evIter.next(); + Path eventRoot = new Path(dumpRoot, String.valueOf(ev.getEventId())); + // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot) + } + LOG.info( + "Consolidation done, preparing to return {},{}->{}", + dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); + dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); + dmd.write(); + + // Set the correct last repl id to return to the user + return bootDumpEndReplId; + } + + private Iterable matchesDb() throws HiveException { + if (work.dbNameOrPattern == null) { + return getHive().getAllDatabases(); + } else { + return getHive().getDatabasesByPattern(work.dbNameOrPattern); + } + } + + private Iterable matchesTbl(String dbName, String tblPattern) + throws HiveException { + Hive db = Hive.get(); + if (tblPattern == null) { + return Collections2.filter(db.getAllTables(dbName), + tableName -> { + assert tableName != null; + return !tableName.toLowerCase().startsWith(TMP_TABLE_PREFIX); + }); + } else { + return db.getTablesByPattern(dbName, tblPattern); + } + } + + private Path dumpDbMetadata(String dbName, Path dumpRoot) throws Exception { + Path dbRoot = new Path(dumpRoot, dbName); + // TODO : instantiating FS objects are generally costly. Refactor + FileSystem fs = dbRoot.getFileSystem(conf); + Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); + HiveWrapper.Tuple database = new HiveWrapper(getHive(), dbName).database(); + EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec); + REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata"); + return dbRoot; + } + + private void dumpTable(String dbName, String tblName, Path dbRoot) throws Exception { + try { + Hive db = getHive(); + TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null); + TableExport.Paths exportPaths = + new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf); + new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).run(); + REPL_STATE_LOG.info( + "Repl Dump: Analyzed dump for table/view: {}.{} and dumping metadata and data to path {}", + dbName, tblName, exportPaths.exportRootDir.toString()); + } catch (InvalidTableException te) { + // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. + // Just log a debug message and skip it. + LOG.debug(te.getMessage()); + } + } + + private ReplicationSpec getNewReplicationSpec() throws TException { + ReplicationSpec rspec = getNewReplicationSpec("replv2", "will-be-set"); + rspec.setCurrentReplicationState(String.valueOf(getHive().getMSC() + .getCurrentNotificationEventId().getEventId())); + return rspec; + } + + private ReplicationSpec getNewReplicationSpec(String evState, String objState) { + return new ReplicationSpec(true, false, evState, objState, false, true, true); + } + + private String getNextDumpDir() { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + // make it easy to write .q unit tests, instead of unique id generation. + // however, this does mean that in writing tests, we have to be aware that + // repl dump will clash with prior dumps, and thus have to clean up properly. + if (work.testInjectDumpDir == null) { + return "next"; + } else { + return work.testInjectDumpDir; + } + } else { + return String.valueOf(System.currentTimeMillis()); + // TODO: time good enough for now - we'll likely improve this. + // We may also work in something the equivalent of pid, thrid and move to nanos to ensure + // uniqueness. + } + } + + private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception { + Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME); + List functionNames = getHive().getFunctions(dbName, "*"); + for (String functionName : functionNames) { + HiveWrapper.Tuple tuple = functionTuple(functionName, dbName); + if (tuple == null) { + continue; + } + Path functionRoot = new Path(functionsRoot, functionName); + Path functionMetadataRoot = new Path(functionRoot, FUNCTION_METADATA_DIR_NAME); + try (JsonWriter jsonWriter = + new JsonWriter(functionMetadataRoot.getFileSystem(conf), functionMetadataRoot)) { + FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf); + serializer.writeTo(jsonWriter, tuple.replicationSpec); + } + REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName); + } + } + + private HiveWrapper.Tuple functionTuple(String functionName, String dbName) { + try { + HiveWrapper.Tuple tuple = new HiveWrapper(getHive(), dbName).function(functionName); + if (tuple.object.getResourceUris().isEmpty()) { + REPL_STATE_LOG.warn( + "Not replicating function: " + functionName + " as it seems to have been created " + + "without USING clause"); + return null; + } + return tuple; + } catch (HiveException e) { + //This can happen as we are querying the getFunctions before we are getting the actual function + //in between there can be a drop function by a user in which case our call will fail. + LOG.info("Function " + functionName + + " could not be found, we are ignoring it as it can be a valid state ", e); + return null; + } + } + + @Override + public StageType getType() { + return StageType.REPLDUMP; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java new file mode 100644 index 0000000000..1f32be9f09 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -0,0 +1,75 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import com.google.common.primitives.Ints; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +@Explain(displayName = "Replication Dump Operator", explainLevels = { Explain.Level.USER, + Explain.Level.DEFAULT, + Explain.Level.EXTENDED }) +public class ReplDumpWork implements Serializable { + final String dbNameOrPattern, tableNameOrPattern, astRepresentationForErrorMsg, resultTempPath; + final Long eventFrom; + Long eventTo; + private Integer maxEventLimit; + static String testInjectDumpDir = null; + + public static void injectNextDumpDirForTest(String dumpDir) { + testInjectDumpDir = dumpDir; + } + + public ReplDumpWork(String dbNameOrPattern, String tableNameOrPattern, + Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit, + String resultTempPath) { + this.dbNameOrPattern = dbNameOrPattern; + this.tableNameOrPattern = tableNameOrPattern; + this.eventFrom = eventFrom; + this.eventTo = eventTo; + this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + this.maxEventLimit = maxEventLimit; + this.resultTempPath = resultTempPath; + } + + boolean isBootStrapDump() { + return eventFrom == null; + } + + int maxEventLimit() throws Exception { + if (eventTo < eventFrom) { + throw new Exception("Invalid event ID input received in TO clause"); + } + Integer maxRange = Ints.checkedCast(this.eventTo - eventFrom + 1); + if ((maxEventLimit == null) || (maxEventLimit > maxRange)) { + maxEventLimit = maxRange; + } + return maxEventLimit; + } + + void overrideEventTo(Hive fromDb) throws Exception { + if (eventTo == null) { + eventTo = fromDb.getMSC().getCurrentNotificationEventId().getEventId(); + LoggerFactory.getLogger(this.getClass()) + .debug("eventTo not specified, using current event id : {}", eventTo); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index fdf6c3c8d1..0932dff283 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -19,6 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.hive.ql.parse; import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; @@ -70,7 +71,8 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // initialize export path String tmpPath = stripQuotes(toTree.getText()); // All parsing is done, we're now good to start the export process. - TableExport.Paths exportPaths = new TableExport.Paths(ast, tmpPath, conf); + TableExport.Paths exportPaths = + new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf); TableExport.AuthEntities authEntities = new TableExport(exportPaths, ts, replicationSpec, db, conf, LOG).run(); inputs.addAll(authEntities.inputs); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 3d0c73649f..48d9c945c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -19,40 +19,22 @@ Licensed to the Apache Software Foundation (ASF) under one import com.google.common.base.Predicate; import com.google.common.collect.Collections2; -import com.google.common.primitives.Ints; import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; -import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; -import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; -import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; -import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.repl.DumpType; -import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; -import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; -import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; -import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory; -import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; -import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; @@ -101,7 +83,6 @@ Licensed to the Apache Software Foundation (ASF) under one private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; - private static final String FUNCTION_METADATA_DIR_NAME = "_metadata"; private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { @@ -180,117 +161,19 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern) + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to " + String.valueOf(eventTo) + " maxEventLimit " + String.valueOf(maxEventLimit)); - String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR); - Path dumpRoot = new Path(replRoot, getNextDumpDir()); - DumpMetaData dmd = new DumpMetaData(dumpRoot, conf); - Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); - Long lastReplId; try { - if (eventFrom == null){ - // bootstrap case - Long bootDumpBeginReplId = db.getMSC().getCurrentNotificationEventId().getEventId(); - for (String dbName : matchesDb(dbNameOrPattern)) { - REPL_STATE_LOG.info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP", dbName); - LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); - - Path dbRoot = dumpDbMetadata(dbName, dumpRoot); - dumpFunctionMetadata(dbName, dumpRoot); - for (String tblName : matchesTbl(dbName, tblNameOrPattern)) { - LOG.debug( - "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); - dumpTable(ast, dbName, tblName, dbRoot); - } - REPL_STATE_LOG.info("Repl Dump: Completed analyzing Repl Dump for DB: {} and created {} COPY tasks to dump " + - "metadata and data", - dbName, rootTasks.size()); - } - Long bootDumpEndReplId = db.getMSC().getCurrentNotificationEventId().getEventId(); - LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, bootDumpEndReplId); - - // Now that bootstrap has dumped all objects related, we have to account for the changes - // that occurred while bootstrap was happening - i.e. we have to look through all events - // during the bootstrap period and consolidate them with our dump. - - IMetaStoreClient.NotificationFilter evFilter = - new DatabaseAndTableFilter(dbNameOrPattern, tblNameOrPattern); - EventUtils.MSClientNotificationFetcher evFetcher = - new EventUtils.MSClientNotificationFetcher(db.getMSC()); - EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( - evFetcher, bootDumpBeginReplId, - Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1, - evFilter ); - - // Now we consolidate all the events that happenned during the objdump into the objdump - while (evIter.hasNext()){ - NotificationEvent ev = evIter.next(); - Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId())); - // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot) - } - LOG.info( - "Consolidation done, preparing to return {},{}->{}", - dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); - dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); - dmd.write(); - - // Set the correct last repl id to return to the user - lastReplId = bootDumpEndReplId; - } else { - // get list of events matching dbPattern & tblPattern - // go through each event, and dump out each event to a event-level dump dir inside dumproot - if (eventTo == null){ - eventTo = db.getMSC().getCurrentNotificationEventId().getEventId(); - LOG.debug("eventTo not specified, using current event id : {}", eventTo); - } else if (eventTo < eventFrom) { - throw new Exception("Invalid event ID input received in TO clause"); - } - - Integer maxRange = Ints.checkedCast(eventTo - eventFrom + 1); - if ((maxEventLimit == null) || (maxEventLimit > maxRange)){ - maxEventLimit = maxRange; - } - - // TODO : instead of simply restricting by message format, we should eventually - // move to a jdbc-driver-stype registering of message format, and picking message - // factory per event to decode. For now, however, since all messages have the - // same factory, restricting by message format is effectively a guard against - // older leftover data that would cause us problems. - - IMetaStoreClient.NotificationFilter evFilter = new AndFilter( - new DatabaseAndTableFilter(dbNameOrPattern, tblNameOrPattern), - new EventBoundaryFilter(eventFrom, eventTo), - new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat())); - - EventUtils.MSClientNotificationFetcher evFetcher - = new EventUtils.MSClientNotificationFetcher(db.getMSC()); - - EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( - evFetcher, eventFrom, maxEventLimit, evFilter); - - lastReplId = eventTo; - REPL_STATE_LOG.info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: INCREMENTAL", - (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?"); - while (evIter.hasNext()){ - NotificationEvent ev = evIter.next(); - lastReplId = ev.getEventId(); - Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); - dumpEvent(ev, evRoot, cmRoot); - } - - REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}", - (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?"); - - LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); - Utils.writeOutput( - Arrays.asList( - "incremental", - String.valueOf(eventFrom), - String.valueOf(lastReplId) - ), - dmd.getDumpFilePath(), conf); - dmd.setDump(DumpType.INCREMENTAL, eventFrom, lastReplId, cmRoot); - dmd.write(); - } - prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema); + ctx.setResFile(ctx.getLocalTmpPath()); + Task replDumpWorkTask = TaskFactory + .get(new ReplDumpWork( + dbNameOrPattern, + tblNameOrPattern, + eventFrom, + eventTo, + ErrorMsg.INVALID_PATH.getMsg(ast), + maxEventLimit, + ctx.getResFile().toUri().toString() + ), conf); + rootTasks.add(replDumpWorkTask); setFetchTask(createFetchTask(dumpSchema)); } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes @@ -299,119 +182,6 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } } - private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception { - EventHandler.Context context = new EventHandler.Context( - evRoot, - cmRoot, - db, - conf, - getNewEventOnlyReplicationSpec(ev.getEventId()) - ); - EventHandlerFactory.handlerFor(ev).handle(context); - REPL_STATE_LOG.info("Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and data to path {}", - String.valueOf(ev.getEventId()), ev.getEventType(), evRoot.toUri().toString()); - } - - public static void injectNextDumpDirForTest(String dumpdir){ - testInjectDumpDir = dumpdir; - } - - private String getNextDumpDir() { - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { - // make it easy to write .q unit tests, instead of unique id generation. - // however, this does mean that in writing tests, we have to be aware that - // repl dump will clash with prior dumps, and thus have to clean up properly. - if (testInjectDumpDir == null){ - return "next"; - } else { - return testInjectDumpDir; - } - } else { - return String.valueOf(System.currentTimeMillis()); - // TODO: time good enough for now - we'll likely improve this. - // We may also work in something the equivalent of pid, thrid and move to nanos to ensure - // uniqueness. - } - } - - /** - * - * @param dbName - * @param dumpRoot - * @return db dumped path - * @throws SemanticException - */ - private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticException { - Path dbRoot = new Path(dumpRoot, dbName); - try { - // TODO : instantiating FS objects are generally costly. Refactor - FileSystem fs = dbRoot.getFileSystem(conf); - Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); - HiveWrapper.Tuple database = new HiveWrapper(db, dbName).database(); - inputs.add(new ReadEntity(database.object)); - EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec); - REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata"); - } catch (Exception e) { - // TODO : simple wrap & rethrow for now, clean up with error codes - throw new SemanticException(e); - } - return dbRoot; - } - - private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticException { - Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME); - try { - List functionNames = db.getFunctions(dbName, "*"); - for (String functionName : functionNames) { - HiveWrapper.Tuple tuple; - try { - tuple = new HiveWrapper(db, dbName).function(functionName); - } catch (HiveException e) { - //This can happen as we are querying the getFunctions before we are getting the actual function - //in between there can be a drop function by a user in which case our call will fail. - LOG.info("Function " + functionName + " could not be found, we are ignoring it as it can be a valid state ", e); - continue; - } - if (tuple.object.getResourceUris().isEmpty()) { - REPL_STATE_LOG.warn( - "Not replicating function: " + functionName + " as it seems to have been created " - + "without USING clause"); - continue; - } - - Path functionRoot = new Path(functionsRoot, functionName); - Path functionMetadataRoot = new Path(functionRoot, FUNCTION_METADATA_DIR_NAME); - try (JsonWriter jsonWriter = new JsonWriter(functionMetadataRoot.getFileSystem(conf), - functionMetadataRoot)) { - FunctionSerializer serializer = - new FunctionSerializer(tuple.object, conf); - serializer.writeTo(jsonWriter, tuple.replicationSpec); - } - REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName); - } - } catch (Exception e) { - throw new SemanticException(e); - } - } - - private void dumpTable(ASTNode ast, String dbName, String tblName, Path dbRoot) - throws SemanticException { - try { - TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null); - TableExport.Paths exportPaths = new TableExport.Paths(ast, dbRoot, tblName, conf); - new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).run(); - REPL_STATE_LOG.info("Repl Dump: Analyzed dump for table/view: {}.{} and created copy tasks to dump metadata " + - "and data to path {}", dbName, tblName, exportPaths.exportRootDir.toString()); - } catch (InvalidTableException te) { - // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. - // Just log a debug message and skip it. - LOG.debug(te.getMessage()); - } catch (HiveException e) { - // TODO : simple wrap & rethrow for now, clean up with error codes - throw new SemanticException(e); - } - } - // REPL LOAD private void initReplLoad(ASTNode ast) { int numChildren = ast.getChildCount(); @@ -953,53 +723,4 @@ private void prepareReturnValues(List values, String schema) throws Sema ctx.setResFile(ctx.getLocalTmpPath()); Utils.writeOutput(values, ctx.getResFile(), conf); } - - private ReplicationSpec getNewReplicationSpec() throws SemanticException { - try { - ReplicationSpec rspec = getNewReplicationSpec("replv2", "will-be-set"); - rspec.setCurrentReplicationState(String.valueOf(db.getMSC() - .getCurrentNotificationEventId().getEventId())); - return rspec; - } catch (Exception e) { - throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error codes - } - } - - // Use for specifying object state as well as event state - private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException { - return new ReplicationSpec(true, false, evState, objState, false, true, true); - } - - // Use for replication states focused on event only, where the obj state will be the event state - private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException { - return getNewReplicationSpec(eventId.toString(), eventId.toString()); - } - - private Iterable matchesTbl(String dbName, String tblPattern) - throws HiveException { - if (tblPattern == null) { - return removeValuesTemporaryTables(db.getAllTables(dbName)); - } else { - return db.getTablesByPattern(dbName, tblPattern); - } - } - - private final static String TMP_TABLE_PREFIX = - SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase(); - - static Iterable removeValuesTemporaryTables(List tableNames) { - return Collections2.filter(tableNames, - tableName -> { - assert tableName != null; - return !tableName.toLowerCase().startsWith(TMP_TABLE_PREFIX); - }); - } - - private Iterable matchesDb(String dbPattern) throws HiveException { - if (dbPattern == null) { - return db.getAllDatabases(); - } else { - return db.getDatabasesByPattern(dbPattern); - } - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 144d667f9b..9f22f230b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -28,7 +28,6 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; -import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; @@ -62,6 +61,9 @@ public TableExport(Paths paths, TableSpec tableSpec, ? null : tableSpec; this.replicationSpec = replicationSpec; + if (this.tableSpec != null && this.tableSpec.tableHandle.isView()) { + this.replicationSpec.setIsMetadataOnly(true); + } this.db = db; this.conf = conf; this.logger = logger; @@ -72,6 +74,7 @@ public AuthEntities run() throws SemanticException { if (tableSpec == null) { writeMetaData(null); } else if (shouldExport()) { + //first we should get the correct replication spec before doing metadata/data export if (tableSpec.tableHandle.isView()) { replicationSpec.setIsMetadataOnly(true); } @@ -111,7 +114,8 @@ private PartitionIterable partitions() throws SemanticException { } } - private void writeMetaData(PartitionIterable partitions) throws SemanticException { + private void writeMetaData(PartitionIterable partitions) + throws SemanticException { try { EximUtil.createExportDump( paths.exportFileSystem, @@ -168,13 +172,14 @@ private boolean shouldExport() throws SemanticException { * directory creation. */ public static class Paths { - private final ASTNode ast; + private final String astRepresentationForErrorMsg; private final HiveConf conf; public final Path exportRootDir; private final FileSystem exportFileSystem; - public Paths(ASTNode ast, Path dbRoot, String tblName, HiveConf conf) throws SemanticException { - this.ast = ast; + public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, + HiveConf conf) throws SemanticException { + this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; this.conf = conf; Path tableRoot = new Path(dbRoot, tblName); URI exportRootDir = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString()); @@ -190,8 +195,9 @@ public Paths(ASTNode ast, Path dbRoot, String tblName, HiveConf conf) throws Sem } } - public Paths(ASTNode ast, String path, HiveConf conf) throws SemanticException { - this.ast = ast; + public Paths(String astRepresentationForErrorMsg, String path, HiveConf conf) + throws SemanticException { + this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; this.conf = conf; this.exportRootDir = new Path(EximUtil.getValidatedURI(conf, path)); try { @@ -245,21 +251,21 @@ private void validateTargetDir(URI rootDirExportFile) throws SemanticException { FileStatus tgt = fs.getFileStatus(toPath); // target exists if (!tgt.isDirectory()) { - throw new SemanticException(ErrorMsg.INVALID_PATH - .getMsg(ast, "Target is not a directory : " + rootDirExportFile)); + throw new SemanticException( + astRepresentationForErrorMsg + ": " + "Target is not a directory : " + + rootDirExportFile); } else { FileStatus[] files = fs.listStatus(toPath, FileUtils.HIDDEN_FILES_PATH_FILTER); if (files != null && files.length != 0) { throw new SemanticException( - ErrorMsg.INVALID_PATH - .getMsg(ast, "Target is not an empty directory : " + rootDirExportFile) - ); + astRepresentationForErrorMsg + ": " + "Target is not an empty directory : " + + rootDirExportFile); } } } catch (FileNotFoundException ignored) { } } catch (IOException e) { - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast), e); + throw new SemanticException(astRepresentationForErrorMsg, e); } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java index 1cb4470b0c..17cf4d04b3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java @@ -1,33 +1,28 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package org.apache.hadoop.hive.ql.parse; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.hasItems; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.*; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import com.google.common.collect.ImmutableList; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -45,14 +40,15 @@ import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + public class TestReplicationSemanticAnalyzer { - static QueryState queryState; + private static QueryState queryState; static HiveConf conf; - static String defaultDB = "default"; - static String tblName = "testReplSA"; - static ArrayList cols = new ArrayList(Arrays.asList("col1", "col2")); - ParseDriver pd; - SemanticAnalyzer sA; + private static String defaultDB = "default"; + private static String tblName = "testReplSA"; + private static ArrayList cols = new ArrayList(Arrays.asList("col1", "col2")); @BeforeClass public static void initialize() throws HiveException { @@ -221,12 +217,6 @@ public void testReplLoadParse() throws Exception { assertEquals(child.getChildCount(), 0); } - // TODO: add this test after repl dump analyze generates tasks - //@Test - public void testReplDumpAnalyze() throws Exception { - - } - //@Test public void testReplLoadAnalyze() throws Exception { ParseDriver pd = new ParseDriver(); @@ -274,19 +264,4 @@ public void testReplStatusAnalyze() throws Exception { FetchTask fetchTask = rs.getFetchTask(); assertNotNull(fetchTask); } - - @Test - public void removeTemporaryTablesForMetadataDump() { - List validTables = ImmutableList.copyOf( - ReplicationSemanticAnalyzer.removeValuesTemporaryTables(new ArrayList() {{ - add(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX + "a"); - add(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX + "b"); - add(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX + "c"); - add("c"); - add("b"); - add("a"); - }})); - assertThat(validTables.size(), is(equalTo(3))); - assertThat(validTables, hasItems("a", "b", "c")); - } }