diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TableType.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TableType.java index e9e16d7..962dd81 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/TableType.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TableType.java @@ -22,5 +22,19 @@ * Typesafe enum for types of tables described by the metastore. */ public enum TableType { - MANAGED_TABLE, EXTERNAL_TABLE, VIRTUAL_VIEW, INDEX_TABLE, MATERIALIZED_VIEW + MANAGED_TABLE ("MANAGED_TABLE"), + EXTERNAL_TABLE("EXTERNAL_TABLE"), + VIRTUAL_VIEW("VIRTUAL_VIEW"), + INDEX_TABLE("INDEX_TABLE"), + MATERIALIZED_VIEW("MATERIALIZED_VIEW"); + + String type = null; + TableType(String type) { + this.type = type; + } + + @Override + public String toString() { + return type; + } } diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift index 00b0200..aaf644a 100644 --- a/ql/if/queryplan.thrift +++ b/ql/if/queryplan.thrift @@ -102,6 +102,7 @@ enum StageType { COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, + REPL_STATE_LOG } struct Stage { diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index f467da2..7262017 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -117,7 +117,8 @@ int _kStageTypeValues[] = { StageType::DEPENDENCY_COLLECTION, StageType::COLUMNSTATS, StageType::REPL_DUMP, - StageType::REPL_BOOTSTRAP_LOAD + StageType::REPL_BOOTSTRAP_LOAD, + StageType::REPL_STATE_LOG }; const char* _kStageTypeNames[] = { "CONDITIONAL", @@ -133,9 +134,10 @@ const char* _kStageTypeNames[] = { "DEPENDENCY_COLLECTION", "COLUMNSTATS", "REPL_DUMP", - "REPL_BOOTSTRAP_LOAD" + "REPL_BOOTSTRAP_LOAD", + "REPL_STATE_LOG" }; -const std::map _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(14, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(15, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); Adjacency::~Adjacency() throw() { diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h index ac87ef7..18dc867 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -95,7 +95,8 @@ struct StageType { DEPENDENCY_COLLECTION = 10, COLUMNSTATS = 11, REPL_DUMP = 12, - REPL_BOOTSTRAP_LOAD = 13 + REPL_BOOTSTRAP_LOAD = 13, + REPL_STATE_LOG = 14 }; }; diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index 11a8f6d..ed408d2 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -25,7 +25,8 @@ DEPENDENCY_COLLECTION(10), COLUMNSTATS(11), REPL_DUMP(12), - REPL_BOOTSTRAP_LOAD(13); + REPL_BOOTSTRAP_LOAD(13), + REPL_STATE_LOG(14); private final int value; @@ -74,6 +75,8 @@ public static StageType findByValue(int value) { return REPL_DUMP; case 13: return REPL_BOOTSTRAP_LOAD; + case 14: + return REPL_STATE_LOG; default: return null; } diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php index 68edfcd..bca2eee 100644 --- a/ql/src/gen/thrift/gen-php/Types.php +++ b/ql/src/gen/thrift/gen-php/Types.php @@ -116,6 +116,7 @@ final class StageType { const COLUMNSTATS = 11; const REPL_DUMP = 12; const REPL_BOOTSTRAP_LOAD = 13; + const REPL_STATE_LOG = 14; static public $__names = array( 0 => 'CONDITIONAL', 1 => 'COPY', @@ -131,6 +132,7 @@ final class StageType { 11 => 'COLUMNSTATS', 12 => 'REPL_DUMP', 13 => 'REPL_BOOTSTRAP_LOAD', + 14 => 'REPL_STATE_LOG', ); } diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 6bf65af..1f0d627 100644 --- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -162,6 +162,7 @@ class StageType: COLUMNSTATS = 11 REPL_DUMP = 12 REPL_BOOTSTRAP_LOAD = 13 + REPL_STATE_LOG = 14 _VALUES_TO_NAMES = { 0: "CONDITIONAL", @@ -178,6 +179,7 @@ class StageType: 11: "COLUMNSTATS", 12: "REPL_DUMP", 13: "REPL_BOOTSTRAP_LOAD", + 14: "REPL_STATE_LOG", } _NAMES_TO_VALUES = { @@ -195,6 +197,7 @@ class StageType: "COLUMNSTATS": 11, "REPL_DUMP": 12, "REPL_BOOTSTRAP_LOAD": 13, + "REPL_STATE_LOG": 14, } diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb index 2730dde..88d9c17 100644 --- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -74,8 +74,9 @@ module StageType COLUMNSTATS = 11 REPL_DUMP = 12 REPL_BOOTSTRAP_LOAD = 13 - VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD"} - VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD]).freeze + REPL_STATE_LOG = 14 + VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG"} + VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG]).freeze end class Adjacency diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 91ac4bf..fe9b624 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; @@ -114,6 +116,7 @@ public TaskTuple(Class workClass, Class> taskClass) { taskvec.add(new TaskTuple(SparkWork.class, SparkTask.class)); taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class)); taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class)); + taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); } private static ThreadLocal tid = new ThreadLocal() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 67a67fd..8bc2289 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -49,6 +49,9 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.log.logger.BootstrapDumpLogger; +import org.apache.hadoop.hive.ql.parse.repl.log.logger.IncrementalDumpLogger; +import org.apache.hadoop.hive.ql.parse.repl.log.logger.ReplLogger; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -64,7 +67,7 @@ Licensed to the Apache Software Foundation (ASF) under one private static final String FUNCTION_METADATA_FILE_NAME = "_metadata"; private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class); - private Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); + private ReplLogger replLogger; @Override public String getName() { @@ -127,8 +130,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty()) ? work.dbNameOrPattern : "?"; - REPL_STATE_LOG - .info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: INCREMENTAL", dbName); + replLogger = new IncrementalDumpLogger(dbName, (lastReplId - work.eventFrom)); + replLogger.startLog(); while (evIter.hasNext()) { NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); @@ -136,7 +139,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw dumpEvent(ev, evRoot, cmRoot); } - REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}", dbName); + replLogger.endLog(dumpRoot.toString(), lastReplId.toString()); LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); Utils.writeOutput( @@ -159,10 +162,9 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Ex conf, getNewEventOnlyReplicationSpec(ev.getEventId()) ); - EventHandlerFactory.handlerFor(ev).handle(context); - REPL_STATE_LOG.info( - "Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and data to path {}", - String.valueOf(ev.getEventId()), ev.getEventType(), evRoot.toUri().toString()); + EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); + eventHandler.handle(context); + replLogger.eventLog(String.valueOf(ev.getEventId()), eventHandler.dumpType().toString()); } private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException { @@ -174,12 +176,13 @@ private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws Sema private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { // bootstrap case Long bootDumpBeginReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId(); + for (String dbName : Utils.matchesDb(getHive(), work.dbNameOrPattern)) { - REPL_STATE_LOG - .info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP", - dbName); LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); - + replLogger = new BootstrapDumpLogger(dbName, + Utils.getAllTables(getHive(), dbName).size(), + getHive().getAllFunctions().size()); + replLogger.startLog(); Path dbRoot = dumpDbMetadata(dbName, dumpRoot); dumpFunctionMetadata(dbName, dumpRoot); for (String tblName : Utils.matchesTbl(getHive(), dbName, work.tableNameOrPattern)) { @@ -187,6 +190,7 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); dumpTable(dbName, tblName, dbRoot); } + replLogger.endLog(dumpRoot.toString(), bootDumpBeginReplId.toString()); } Long bootDumpEndReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId(); LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, @@ -228,7 +232,6 @@ private Path dumpDbMetadata(String dbName, Path dumpRoot) throws Exception { Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); HiveWrapper.Tuple database = new HiveWrapper(getHive(), dbName).database(); EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec); - REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata"); return dbRoot; } @@ -240,9 +243,7 @@ private void dumpTable(String dbName, String tblName, Path dbRoot) throws Except new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf); String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); new TableExport(exportPaths, ts, getNewReplicationSpec(), db, distCpDoAsUser, conf).write(); - REPL_STATE_LOG.info( - "Repl Dump: Analyzed dump for table/view: {}.{} and dumping metadata and data to path {}", - dbName, tblName, exportPaths.exportRootDir.toString()); + replLogger.tableLog(tblName, ts.tableHandle.getTableType()); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. // Just log a debug message and skip it. @@ -295,7 +296,7 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf); serializer.writeTo(jsonWriter, tuple.replicationSpec); } - REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName); + replLogger.functionLog(functionName); } } @@ -303,8 +304,7 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception try { HiveWrapper.Tuple tuple = new HiveWrapper(getHive(), dbName).function(functionName); if (tuple.object.getResourceUris().isEmpty()) { - REPL_STATE_LOG.warn( - "Not replicating function: " + functionName + " as it seems to have been created " + LOG.warn("Not replicating function: " + functionName + " as it seems to have been created " + "without USING clause"); return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java new file mode 100644 index 0000000..614af54 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.plan.api.StageType; + +import java.io.Serializable; + +/** + * ReplStateLogTask. + * + * Exists for the sole purpose of reducing the number of dependency edges in the task graph. + **/ +public class ReplStateLogTask extends Task implements Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public int execute(DriverContext driverContext) { + work.replStateLog(); + return 0; + } + + @Override + public StageType getType() { + return StageType.REPL_STATE_LOG; + } + + @Override + public String getName() { + return "REPL_STATE_LOG"; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java new file mode 100644 index 0000000..7e5cc1f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.repl.log.logger.ReplLogger; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + +import java.io.Serializable; + + +/** + * ReplStateLogWork + * + */ +@Explain(displayName = "Repl State Log", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +public class ReplStateLogWork implements Serializable { + private static final long serialVersionUID = 1L; + private final ReplLogger replLogger; + private final LOG_TYPE logType; + private String eventId; + private String eventType; + private String tableName; + private TableType tableType; + private String functionName; + private String dumpDir; + private String lastReplId; + + private enum LOG_TYPE { + TABLE, + FUNCTION, + EVENT, + END + } + + public ReplStateLogWork(ReplLogger replLogger, String eventId, String eventType) { + this.logType = LOG_TYPE.EVENT; + this.replLogger = replLogger; + this.eventId = eventId; + this.eventType = eventType; + } + + public ReplStateLogWork(ReplLogger replLogger, String tableName, TableType tableType) { + this.logType = LOG_TYPE.TABLE; + this.replLogger = replLogger; + this.tableName = tableName; + this.tableType = tableType; + } + + public ReplStateLogWork(ReplLogger replLogger, String functionName) { + this.logType = LOG_TYPE.FUNCTION; + this.replLogger = replLogger; + this.functionName = functionName; + } + + public ReplStateLogWork(ReplLogger replLogger, String dumpDir, Database db) { + this.logType = LOG_TYPE.END; + this.replLogger = replLogger; + this.dumpDir = dumpDir; + this.lastReplId = ReplicationSpec.getLastReplicatedStateFromParameters(db.getParameters()); + } + + public void replStateLog() { + switch (logType) { + case TABLE: { + replLogger.tableLog(tableName, tableType); + break; + } + case FUNCTION: { + replLogger.functionLog(functionName); + break; + } + case EVENT: { + replLogger.eventLog(eventId, eventType); + break; + } + case END: { + replLogger.endLog(dumpDir, lastReplId); + break; + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 6ea1754..bebbb12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -18,9 +18,12 @@ Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.hive.ql.exec.repl.bootstrap; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; @@ -35,6 +38,8 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.log.logger.ReplLogger; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.Serializable; @@ -131,6 +136,13 @@ a database ( directory ) partitionsTracker); tableTracker.debugLog("table"); partitionsTracker.debugLog("partitions for table"); + + if (!loadTaskTracker.hasReplicationState()) { + // Add ReplStateLogTask only if no pending table load tasks left for next cycle + ImportTableDesc tableDesc = loadPartitions.tableDesc(); + loadTaskTracker.update(createTableReplLogTask(tableTracker.tasks(), maxTasks, + iterator.replLogger(), tableDesc.getTableName(), tableDesc.tableType())); + } break; } case Partition: { @@ -153,6 +165,13 @@ a database ( directory ) partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, partitionsTracker); partitionsTracker.debugLog("partitions"); + + if (!loadTaskTracker.hasReplicationState()) { + // Add ReplStateLogTask only if no pending table load tasks left for next cycle + ImportTableDesc tableDesc = loadPartitions.tableDesc(); + loadTaskTracker.update(createTableReplLogTask(partitionsTracker.tasks(), maxTasks, + iterator.replLogger(), tableDesc.getTableName(), tableDesc.tableType())); + } break; } case Function: { @@ -166,14 +185,24 @@ a database ( directory ) } loadTaskTracker.update(functionsTracker); functionsTracker.debugLog("functions"); + + // Add ReplStateLogTask as function load tasks are always fully added in current cycle + loadTaskTracker.update(createFunctionReplLogTask(functionsTracker.tasks(), maxTasks, + iterator.replLogger(), loadFunction.functionName())); break; } } + + if (!iterator.currentDbHasNext()) { + loadTaskTracker.update(createEndReplLogTask(maxTasks, context, scope, + iterator.replLogger(), iterator.dumpDirectory())); + } } boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState(); createBuilderTask(scope.rootTasks, addAnotherLoadTask); if (!iterator.hasNext()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); + work.updateDbEventState(null); } this.childTasks = scope.rootTasks; LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); @@ -186,6 +215,46 @@ a database ( directory ) return 0; } + private TaskTracker createTableReplLogTask(List> tableTasks, + int maxTasks, + ReplLogger replLogger, String tableName, TableType tableType) { + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableName, tableType); + Task replLogTask = TaskFactory.get(replLogWork, conf); + dependency(tableTasks, replLogTask); + + TaskTracker tracker = new TaskTracker(maxTasks); + tracker.addTask(replLogTask); + return tracker; + } + + private TaskTracker createFunctionReplLogTask(List> functionTasks, + int maxTasks, + ReplLogger replLogger, String functionName) { + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, functionName); + Task replLogTask = TaskFactory.get(replLogWork, conf); + dependency(functionTasks, replLogTask); + + TaskTracker tracker = new TaskTracker(maxTasks); + tracker.addTask(replLogTask); + return tracker; + } + + private TaskTracker createEndReplLogTask(int maxTasks, Context context, Scope scope, + ReplLogger replLogger, String dumpDir) throws SemanticException { + Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn); + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dumpDir, dbInMetadata); + Task replLogTask = TaskFactory.get(replLogWork, conf); + if (null == scope.rootTasks) { + scope.rootTasks.add(replLogTask); + } else { + dependency(scope.rootTasks, replLogTask); + } + + TaskTracker tracker = new TaskTracker(maxTasks); + tracker.addTask(replLogTask); + return tracker; + } + /** * There was a database update done before and we want to make sure we update the last repl * id on this database as we are now going to switch to processing a new database. @@ -243,18 +312,20 @@ private void createBuilderTask(List> rootTasks, /** * add the dependency to the leaf node */ - private boolean dependency(List> tasks, - Task loadTask) { + private boolean dependency(List> tasks, Task tailTask) { if (tasks == null || tasks.isEmpty()) { return true; } for (Task task : tasks) { - boolean dependency = dependency(task.getChildTasks(), loadTask); - if (dependency) { - task.addDependentTask(loadTask); + if (task == tailTask) { + continue; + } + boolean leafNode = dependency(task.getChildTasks(), tailTask); + if (leafNode) { + task.addDependentTask(tailTask); } } - return true; + return false; } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index eb18e5f..f51afe1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -38,7 +38,7 @@ Licensed to the Apache Software Foundation (ASF) under one public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, String tableNameToLoadIn) throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; - this.iterator = new BootstrapEventsIterator(dumpDirectory, hiveConf); + this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; } @@ -61,7 +61,6 @@ void updateDbEventState(DatabaseEvent.State state) { DatabaseEvent databaseEvent(HiveConf hiveConf) { DatabaseEvent databaseEvent = state.toEvent(hiveConf); - state = null; return databaseEvent; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java index 4e635ad..dde72ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java @@ -17,13 +17,15 @@ Licensed to the Apache Software Foundation (ASF) under one */ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.*; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.repl.log.logger.BootstrapLoadLogger; +import org.apache.hadoop.hive.ql.parse.repl.log.logger.ReplLogger; import java.io.IOException; import java.util.Arrays; @@ -69,8 +71,13 @@ Licensed to the Apache Software Foundation (ASF) under one warehouse. */ private Iterator dbEventsIterator; + private final String dumpDirectory; + private final String dbNameToLoadIn; + private final HiveConf hiveConf; + private ReplLogger replLogger; - public BootstrapEventsIterator(String dumpDirectory, HiveConf hiveConf) throws IOException { + public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, HiveConf hiveConf) + throws IOException { Path path = new Path(dumpDirectory); FileSystem fileSystem = path.getFileSystem(hiveConf); FileStatus[] fileStatuses = @@ -93,6 +100,9 @@ public BootstrapEventsIterator(String dumpDirectory, HiveConf hiveConf) throws I } }).collect(Collectors.toList()).iterator(); + this.dumpDirectory = dumpDirectory; + this.dbNameToLoadIn = dbNameToLoadIn; + this.hiveConf = hiveConf; } @Override @@ -101,6 +111,7 @@ public boolean hasNext() { if (currentDatabaseIterator == null) { if (dbEventsIterator.hasNext()) { currentDatabaseIterator = dbEventsIterator.next(); + initReplLogger(); } else { return false; } @@ -127,7 +138,48 @@ public void forEachRemaining(Consumer action) { throw new UnsupportedOperationException("This operation is not supported"); } + public boolean currentDbHasNext() { + return ((currentDatabaseIterator != null) && (currentDatabaseIterator.hasNext())); + } + public void setReplicationState(ReplicationState replicationState) { this.currentDatabaseIterator.replicationState = replicationState; } + + public ReplLogger replLogger() { + return replLogger; + } + + public String dumpDirectory() { + return dumpDirectory; + } + + private void initReplLogger() { + try { + Path dbDumpPath = currentDatabaseIterator.dbLevelPath(); + FileSystem fs = dbDumpPath.getFileSystem(hiveConf); + + long numTables = getSubDirs(fs, dbDumpPath).length; + long numFunctions = 0; + Path funcPath = new Path(dbDumpPath, ReplicationSemanticAnalyzer.FUNCTIONS_ROOT_DIR_NAME); + if (fs.exists(funcPath)) { + numFunctions = getSubDirs(fs, funcPath).length; + } + String dbName = StringUtils.isBlank(dbNameToLoadIn) ? dbDumpPath.getName() : dbNameToLoadIn; + replLogger = new BootstrapLoadLogger(dbName, dumpDirectory, numTables, numFunctions); + replLogger.startLog(); + } catch (IOException e) { + // Ignore the exception + } + } + + FileStatus[] getSubDirs(FileSystem fs, Path dirPath) throws IOException { + return fs.listStatus(dirPath, new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java index 3100875..dc0e192 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -58,6 +58,10 @@ Licensed to the Apache Software Foundation (ASF) under one remoteIterator = fileSystem.listFiles(dbLevelPath, true); } + public Path dbLevelPath() { + return this.dbLevelPath; + } + @Override public boolean hasNext() { try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java index e9b8711..94cbef4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -41,6 +41,7 @@ Licensed to the Apache Software Foundation (ASF) under one private Context context; private final FunctionEvent event; private final String dbNameToLoadIn; + private String functionName; private final TaskTracker tracker; public LoadFunction(Context context, FunctionEvent event, String dbNameToLoadIn, @@ -51,6 +52,10 @@ public LoadFunction(Context context, FunctionEvent event, String dbNameToLoadIn, this.tracker = new TaskTracker(existingTracker); } + public String functionName() { + return functionName; + } + public TaskTracker tasks() throws IOException, SemanticException { URI fromURI = EximUtil .getValidatedURI(context.hiveConf, stripQuotes(event.rootDir().toUri().toString())); @@ -63,6 +68,7 @@ public TaskTracker tasks() throws IOException, SemanticException { dbNameToLoadIn, null, fromPath.toString(), null, null, context.hiveConf, context.hiveDb, null, LOG) ); + this.functionName = handler.getFunctionName(); tasks.forEach(tracker::addTask); return tracker; } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index f088ba9..3be1bec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -98,6 +98,10 @@ private String location() throws MetaException, HiveException { } } + public ImportTableDesc tableDesc() { + return tableDesc; + } + public TaskTracker tasks() throws SemanticException { try { /* @@ -149,9 +153,11 @@ private TaskTracker forNewTable() throws Exception { while (iterator.hasNext() && tracker.canAddMoreTasks()) { AddPartitionDesc addPartitionDesc = iterator.next(); tracker.addTask(addSinglePartition(table, addPartitionDesc)); - ReplicationState currentReplicationState = - new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); - updateReplicationState(currentReplicationState); + if (iterator.hasNext()) { + ReplicationState currentReplicationState = + new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); + updateReplicationState(currentReplicationState); + } } return tracker; } @@ -236,13 +242,16 @@ private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) t boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated == null); ReplicationSpec replicationSpec = event.replicationSpec(); LOG.debug("table partitioned"); - for (AddPartitionDesc addPartitionDesc : event.partitionDescriptions(tableDesc)) { + + Iterator iterator = event.partitionDescriptions(tableDesc).iterator(); + while (iterator.hasNext()) { /* encounteredTheLastReplicatedPartition will be set, when we break creation of partition tasks for a table, as we have reached the limit of number of tasks we should create for execution. in this case on the next run we have to iterate over the partitions desc to reach the last replicated partition so that we can start replicating partitions after that. */ + AddPartitionDesc addPartitionDesc = iterator.next(); if (encounteredTheLastReplicatedPartition && tracker.canAddMoreTasks()) { Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); Partition ptn; @@ -257,7 +266,7 @@ private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) t if (replicationSpec.allowReplacementInto(ptn.getParameters())) { if (replicationSpec.isMetadataOnly()) { tracker.addTask(alterSinglePartition(addPartitionDesc, replicationSpec, ptn)); - if (!tracker.canAddMoreTasks()) { + if (iterator.hasNext() && !tracker.canAddMoreTasks()) { tracker.setReplicationState( new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc) ) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 3e2c513..57aad17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -28,6 +28,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -37,13 +38,13 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.apache.hadoop.hive.ql.parse.repl.log.logger.IncrementalLoadLogger; +import org.apache.hadoop.hive.ql.parse.repl.log.logger.ReplLogger; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.Serializable; @@ -76,7 +77,6 @@ Licensed to the Apache Software Foundation (ASF) under one private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; - private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); @@ -329,12 +329,9 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { Task evTaskRoot = TaskFactory.get(new DependencyCollectionWork(), conf); Task taskChainTail = evTaskRoot; - int evstage = 0; - int evIter = 0; - - REPL_STATE_LOG.info("Repl Load: Started analyzing Repl load for DB: {} from path {}, Dump Type: INCREMENTAL", - (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?", - loadPath.toUri().toString()); + ReplLogger replLogger = new IncrementalLoadLogger(dbNameOrPattern, + loadPath.toString(), dirsInLoadPath.length); + replLogger.startLog(); for (FileStatus dir : dirsInLoadPath){ LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); @@ -361,15 +358,12 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf); List> evTasks = analyzeEventLoad( dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, eventDmd); - evIter++; - REPL_STATE_LOG.info("Repl Load: Analyzed load for event {}/{} " + - "with ID: {}, Type: {}, Path: {}", - evIter, dirsInLoadPath.length, - dir.getPath().getName(), eventDmd.getDumpType().toString(), locn); - LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0); if ((evTasks != null) && (!evTasks.isEmpty())){ - Task barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, + dir.getPath().getName(), + eventDmd.getDumpType().toString()); + Task barrierTask = TaskFactory.get(replStateLogWork, conf); for (Task t : evTasks){ t.addDependentTask(barrierTask); LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", @@ -378,14 +372,9 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { LOG.debug("Updated taskChainTail from {}{} to {}{}", taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); taskChainTail = barrierTask; - evstage++; } } rootTasks.add(evTaskRoot); - REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl load for DB: {} from path {} and created import " + - "(DDL/COPY/MOVE) tasks", - (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?", - loadPath.toUri().toString()); } } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 1c54d29..235a44c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -207,13 +207,6 @@ public boolean apply(@Nullable Partition partition) { }; } - private static String getLastReplicatedStateFromParameters(Map m) { - if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID.toString()))){ - return m.get(KEY.CURR_STATE_ID.toString()); - } - return null; - } - private void init(ASTNode node){ // -> ^(TOK_REPLICATION $replId $isMetadataOnly) isInReplicationScope = true; @@ -225,6 +218,13 @@ private void init(ASTNode node){ } } + public static String getLastReplicatedStateFromParameters(Map m) { + if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID.toString()))){ + return m.get(KEY.CURR_STATE_ID.toString()); + } + return null; + } + /** * @return true if this statement is being run for the purposes of replication */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index f40c703..a48a17e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -31,6 +31,7 @@ import java.io.DataOutputStream; import java.io.IOException; +import java.util.Collection; import java.util.List; public class Utils { @@ -64,14 +65,18 @@ public static void writeOutput(List values, Path outputFile, HiveConf hi public static Iterable matchesTbl(Hive db, String dbName, String tblPattern) throws HiveException { if (tblPattern == null) { - return Collections2.filter(db.getAllTables(dbName), - tableName -> { - assert tableName != null; - return !tableName.toLowerCase().startsWith( - SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase()); - }); + return getAllTables(db, dbName); } else { return db.getTablesByPattern(dbName, tblPattern); } } + + public static Collection getAllTables(Hive db, String dbName) throws HiveException { + return Collections2.filter(db.getAllTables(dbName), + tableName -> { + assert tableName != null; + return !tableName.toLowerCase().startsWith( + SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase()); + }); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index 3f176aa..caf6f3f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -49,12 +49,20 @@ import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toReadEntity; public class CreateFunctionHandler extends AbstractMessageHandler { + private String functionName; + + public String getFunctionName() { + return functionName; + } + @Override public List> handle(Context context) throws SemanticException { try { FunctionDescBuilder builder = new FunctionDescBuilder(context); CreateFunctionDesc descToLoad = builder.build(); + this.functionName = builder.metadata.function.getFunctionName(); + context.log.debug("Loading function desc : {}", descToLoad.toString()); Task createTask = TaskFactory.get( new FunctionWork(descToLoad), context.hiveConf diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/BootstrapDumpLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/BootstrapDumpLogger.java new file mode 100644 index 0000000..859e4b1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/BootstrapDumpLogger.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.logger; + +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.ql.parse.repl.log.message.BootstrapDumpBeginLog; +import org.apache.hadoop.hive.ql.parse.repl.log.message.BootstrapDumpEndLog; +import org.apache.hadoop.hive.ql.parse.repl.log.message.BootstrapDumpFunctionLog; +import org.apache.hadoop.hive.ql.parse.repl.log.message.BootstrapDumpTableLog; + +public class BootstrapDumpLogger extends ReplLogger { + private String dbName; + private Long estimatedNumTables; + private Long estimatedNumFunctions; + private Long tableSeqNo; + private Long functionSeqNo; + + public BootstrapDumpLogger(String dbName, int estimatedNumTables, int estimatedNumFunctions) { + this.dbName = dbName; + this.estimatedNumTables = new Long(estimatedNumTables); + this.estimatedNumFunctions = new Long(estimatedNumFunctions); + this.tableSeqNo = new Long(0); + this.functionSeqNo = new Long(0); + } + + @Override + public void startLog() { + (new BootstrapDumpBeginLog(dbName, estimatedNumTables, estimatedNumFunctions)).log(); + } + + @Override + public void tableLog(String tableName, TableType tableType) { + tableSeqNo++; + (new BootstrapDumpTableLog(tableName, tableType, tableSeqNo, estimatedNumTables)).log(); + } + + @Override + public void functionLog(String funcName) { + functionSeqNo++; + (new BootstrapDumpFunctionLog(funcName, functionSeqNo, estimatedNumFunctions)).log(); + } + + @Override + public void endLog(String dumpDir, String lastReplId) { + (new BootstrapDumpEndLog(dbName, tableSeqNo, functionSeqNo, dumpDir, lastReplId)).log(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/BootstrapLoadLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/BootstrapLoadLogger.java new file mode 100644 index 0000000..4a0dd55 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/BootstrapLoadLogger.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.logger; + +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.ql.parse.repl.log.message.BootstrapLoadBeginLog; +import org.apache.hadoop.hive.ql.parse.repl.log.message.BootstrapLoadEndLog; +import org.apache.hadoop.hive.ql.parse.repl.log.message.BootstrapLoadFunctionLog; +import org.apache.hadoop.hive.ql.parse.repl.log.message.BootstrapLoadTableLog; + +public class BootstrapLoadLogger extends ReplLogger { + private String dbName; + private String dumpDir; + private Long numTables; + private Long numFunctions; + private Long tableSeqNo; + private Long functionSeqNo; + + public BootstrapLoadLogger(String dbName, String dumpDir, long numTables, long numFunctions) { + this.dbName = dbName; + this.dumpDir = dumpDir; + this.numTables = new Long(numTables); + this.numFunctions = new Long(numFunctions); + this.tableSeqNo = new Long(0); + this.functionSeqNo = new Long(0); + } + + @Override + public void startLog() { + (new BootstrapLoadBeginLog(dbName, dumpDir, numTables, numFunctions)).log(); + } + + @Override + public void tableLog(String tableName, TableType tableType) { + tableSeqNo++; + (new BootstrapLoadTableLog(tableName, tableType, tableSeqNo, numTables)).log(); + } + + @Override + public void functionLog(String funcName) { + functionSeqNo++; + (new BootstrapLoadFunctionLog(funcName, functionSeqNo, numFunctions)).log(); + } + + @Override + public void endLog(String dumpDir, String lastReplId) { + (new BootstrapLoadEndLog(dbName, numTables, numFunctions, dumpDir, lastReplId)).log(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/IncrementalDumpLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/IncrementalDumpLogger.java new file mode 100644 index 0000000..2778634 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/IncrementalDumpLogger.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.logger; + +import org.apache.hadoop.hive.ql.parse.repl.log.message.IncrementalDumpBeginLog; +import org.apache.hadoop.hive.ql.parse.repl.log.message.IncrementalDumpEndLog; +import org.apache.hadoop.hive.ql.parse.repl.log.message.IncrementalDumpEventLog; + +public class IncrementalDumpLogger extends ReplLogger { + private String dbName; + private Long estimatedNumEvents; + private Long eventSeqNo; + + public IncrementalDumpLogger(String dbName, long estimatedNumEvents) { + this.dbName = dbName; + this.estimatedNumEvents = new Long(estimatedNumEvents); + this.eventSeqNo = new Long(0); + } + + @Override + public void startLog() { + (new IncrementalDumpBeginLog(dbName, estimatedNumEvents)).log(); + } + + @Override + public void eventLog(String eventId, String eventType) { + eventSeqNo++; + (new IncrementalDumpEventLog(eventId, eventType, eventSeqNo, estimatedNumEvents)).log(); + } + + @Override + public void endLog(String dumpDir, String lastReplId) { + (new IncrementalDumpEndLog(dbName, eventSeqNo, dumpDir, lastReplId)).log(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/IncrementalLoadLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/IncrementalLoadLogger.java new file mode 100644 index 0000000..bb60bea --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/IncrementalLoadLogger.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.logger; + +import org.apache.hadoop.hive.ql.parse.repl.log.message.IncrementalLoadBeginLog; +import org.apache.hadoop.hive.ql.parse.repl.log.message.IncrementalLoadEndLog; +import org.apache.hadoop.hive.ql.parse.repl.log.message.IncrementalLoadEventLog; + +public class IncrementalLoadLogger extends ReplLogger { + private String dbName; + private String dumpDir; + private Long numEvents; + private Long eventSeqNo; + + public IncrementalLoadLogger(String dbName, String dumpDir, int numEvents) { + this.dbName = dbName; + this.dumpDir = dumpDir; + this.numEvents = new Long(numEvents); + this.eventSeqNo = new Long(0); + } + + @Override + public void startLog() { + (new IncrementalLoadBeginLog(dbName, dumpDir, numEvents)).log(); + } + + @Override + public void eventLog(String eventId, String eventType) { + eventSeqNo++; + (new IncrementalLoadEventLog(eventId, eventType, eventSeqNo, numEvents)).log(); + } + + @Override + public void endLog(String dumpDir, String lastReplId) { + (new IncrementalLoadEndLog(dbName, numEvents, dumpDir, lastReplId)).log(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/ReplLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/ReplLogger.java new file mode 100644 index 0000000..d32ee6b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/logger/ReplLogger.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.logger; + +import org.apache.hadoop.hive.metastore.TableType; + +public abstract class ReplLogger { + + ReplLogger() { + } + + public void tableLog(String tableName, TableType tableType) { + } + public void functionLog(String funcName){ + } + public void eventLog(String eventId, String eventType) { + } + + public abstract void startLog(); + public abstract void endLog(String dumpDir, String lastReplId); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/AbstractReplLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/AbstractReplLog.java new file mode 100644 index 0000000..42f14ab --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/AbstractReplLog.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class AbstractReplLog { + static final Logger REPL_LOG = LoggerFactory.getLogger("ReplState"); + static final ObjectMapper mapper = new ObjectMapper(); // Thread-safe. + + static { + mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, false); + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, false); + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, false); + } + + public void log() { + try { + REPL_LOG.info("REPL_STATE: {}", mapper.writeValueAsString(this)); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapDumpBeginLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapDumpBeginLog.java new file mode 100644 index 0000000..7c0e96e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapDumpBeginLog.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.codehaus.jackson.annotate.JsonProperty; + +public class BootstrapDumpBeginLog extends AbstractReplLog { + @JsonProperty + private String dbName; + + @JsonProperty + private DumpType dumpType; + + @JsonProperty + private Long estimatedNumTables; + + @JsonProperty + private Long estimatedNumFunctions; + + @JsonProperty + private Long dumpStartTime; + + public BootstrapDumpBeginLog(String dbName, Long estimatedNumTables, Long estimatedNumFunctions) { + this.dbName = dbName; + this.dumpType = DumpType.BOOTSTRAP; + this.estimatedNumTables = estimatedNumTables; + this.estimatedNumFunctions = estimatedNumFunctions; + this.dumpStartTime = System.currentTimeMillis() / 1000; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapDumpEndLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapDumpEndLog.java new file mode 100644 index 0000000..1eb7433 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapDumpEndLog.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.codehaus.jackson.annotate.JsonProperty; + +public class BootstrapDumpEndLog extends AbstractReplLog { + @JsonProperty + private String dbName; + + @JsonProperty + private DumpType dumpType; + + @JsonProperty + private Long actualNumTables; + + @JsonProperty + private Long actualNumFunctions; + + @JsonProperty + private Long dumpEndTime; + + @JsonProperty + private String dumpDir; + + @JsonProperty + private String lastReplId; + + public BootstrapDumpEndLog(String dbName, + Long actualNumTables, + Long actualNumFunctions, + String dumpDir, + String lastReplId) { + this.dbName = dbName; + this.dumpType = DumpType.BOOTSTRAP; + this.actualNumTables = actualNumTables; + this.actualNumFunctions = actualNumFunctions; + this.dumpEndTime = System.currentTimeMillis() / 1000; + this.dumpDir = dumpDir; + this.lastReplId = lastReplId; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapDumpFunctionLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapDumpFunctionLog.java new file mode 100644 index 0000000..ebd03cf --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapDumpFunctionLog.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.codehaus.jackson.annotate.JsonProperty; + +public class BootstrapDumpFunctionLog extends AbstractReplLog { + @JsonProperty + private String functionName; + + @JsonProperty + private String functionsDumpProgress; + + @JsonProperty + private Long dumpTime; + + public BootstrapDumpFunctionLog(String funcName, Long functionSeqNo, Long estimatedNumFunctions) { + this.functionName = funcName; + this.functionsDumpProgress = new String(new StringBuilder().append(functionSeqNo).append("/").append(estimatedNumFunctions)); + this.dumpTime = System.currentTimeMillis() / 1000; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapDumpTableLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapDumpTableLog.java new file mode 100644 index 0000000..862fb91 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapDumpTableLog.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.apache.hadoop.hive.metastore.TableType; +import org.codehaus.jackson.annotate.JsonProperty; + +public class BootstrapDumpTableLog extends AbstractReplLog { + @JsonProperty + private String tableName; + + @JsonProperty + private String tableType; + + @JsonProperty + private String tablesDumpProgress; + + @JsonProperty + private Long dumpTime; + + public BootstrapDumpTableLog(String tableName, + TableType tableType, + Long tableSeqNo, + Long estimatedNumTables) { + this.tableName = tableName; + this.tableType = tableType.toString(); + this.tablesDumpProgress = new String(new StringBuilder().append(tableSeqNo).append("/").append(estimatedNumTables)); + this.dumpTime = System.currentTimeMillis() / 1000; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapLoadBeginLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapLoadBeginLog.java new file mode 100644 index 0000000..1a0ec6a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapLoadBeginLog.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.codehaus.jackson.annotate.JsonProperty; + +public class BootstrapLoadBeginLog extends AbstractReplLog { + @JsonProperty + private String dbName; + + @JsonProperty + private String dumpDir; + + @JsonProperty + private DumpType loadType; + + @JsonProperty + private Long numTables; + + @JsonProperty + private Long numFunctions; + + @JsonProperty + private Long loadStartTime; + + public BootstrapLoadBeginLog(String dbName, String dumpDir, Long numTables, Long numFunctions) { + this.dbName = dbName; + this.dumpDir = dumpDir; + this.loadType = DumpType.BOOTSTRAP; + this.numTables = numTables; + this.numFunctions = numFunctions; + this.loadStartTime = System.currentTimeMillis() / 1000; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapLoadEndLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapLoadEndLog.java new file mode 100644 index 0000000..3221a46 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapLoadEndLog.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.codehaus.jackson.annotate.JsonProperty; + +public class BootstrapLoadEndLog extends AbstractReplLog { + @JsonProperty + private String dbName; + + @JsonProperty + private DumpType loadType; + + @JsonProperty + private Long numTables; + + @JsonProperty + private Long numFunctions; + + @JsonProperty + private Long loadEndTime; + + @JsonProperty + private String dumpDir; + + @JsonProperty + private String lastReplId; + + public BootstrapLoadEndLog(String dbName, + Long numTables, + Long numFunctions, + String dumpDir, + String lastReplId) { + this.dbName = dbName; + this.loadType = DumpType.BOOTSTRAP; + this.numTables = numTables; + this.numFunctions = numFunctions; + this.loadEndTime = System.currentTimeMillis() / 1000; + this.dumpDir = dumpDir; + this.lastReplId = lastReplId; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapLoadFunctionLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapLoadFunctionLog.java new file mode 100644 index 0000000..dc1ef0a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapLoadFunctionLog.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.codehaus.jackson.annotate.JsonProperty; + +public class BootstrapLoadFunctionLog extends AbstractReplLog { + @JsonProperty + private String functionName; + + @JsonProperty + private String functionsLoadProgress; + + @JsonProperty + private Long loadTime; + + public BootstrapLoadFunctionLog(String funcName, Long functionSeqNo, Long numFunctions) { + this.functionName = funcName; + this.functionsLoadProgress = new String(new StringBuilder().append(functionSeqNo).append("/").append(numFunctions)); + this.loadTime = System.currentTimeMillis() / 1000; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapLoadTableLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapLoadTableLog.java new file mode 100644 index 0000000..8a8a092 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/BootstrapLoadTableLog.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.apache.hadoop.hive.metastore.TableType; +import org.codehaus.jackson.annotate.JsonProperty; + +public class BootstrapLoadTableLog extends AbstractReplLog { + @JsonProperty + private String tableName; + + @JsonProperty + private String tableType; + + @JsonProperty + private String tablesLoadProgress; + + @JsonProperty + private Long loadTime; + + public BootstrapLoadTableLog(String tableName, + TableType tableType, + Long tableSeqNo, + Long numTables) { + this.tableName = tableName; + this.tableType = tableType.toString(); + this.tablesLoadProgress = new String(new StringBuilder().append(tableSeqNo).append("/").append(numTables)); + this.loadTime = System.currentTimeMillis() / 1000; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalDumpBeginLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalDumpBeginLog.java new file mode 100644 index 0000000..9ca592b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalDumpBeginLog.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.codehaus.jackson.annotate.JsonProperty; + +public class IncrementalDumpBeginLog extends AbstractReplLog { + @JsonProperty + private String dbName; + + @JsonProperty + private DumpType dumpType; + + @JsonProperty + private Long estimatedNumEvents; + + @JsonProperty + private Long dumpStartTime; + + public IncrementalDumpBeginLog(String dbName, Long estimatedNumEvents) { + this.dbName = dbName; + this.dumpType = DumpType.INCREMENTAL; + this.estimatedNumEvents = estimatedNumEvents; + this.dumpStartTime = System.currentTimeMillis() / 1000; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalDumpEndLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalDumpEndLog.java new file mode 100644 index 0000000..568fca3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalDumpEndLog.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.codehaus.jackson.annotate.JsonProperty; + +public class IncrementalDumpEndLog extends AbstractReplLog { + @JsonProperty + private String dbName; + + @JsonProperty + private DumpType dumpType; + + @JsonProperty + private Long actualNumEvents; + + @JsonProperty + private Long dumpEndTime; + + @JsonProperty + private String dumpDir; + + @JsonProperty + private String lastReplId; + + public IncrementalDumpEndLog(String dbName, + Long actualNumEvents, + String dumpDir, + String lastReplId) { + this.dbName = dbName; + this.dumpType = DumpType.INCREMENTAL; + this.actualNumEvents = actualNumEvents; + this.dumpEndTime = System.currentTimeMillis() / 1000; + this.dumpDir = dumpDir; + this.lastReplId = lastReplId; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalDumpEventLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalDumpEventLog.java new file mode 100644 index 0000000..9b4128a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalDumpEventLog.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.codehaus.jackson.annotate.JsonProperty; + +public class IncrementalDumpEventLog extends AbstractReplLog { + @JsonProperty + private String eventId; + + @JsonProperty + private String eventType; + + @JsonProperty + private String eventsDumpProgress; + + @JsonProperty + private Long dumpTime; + + public IncrementalDumpEventLog(String eventId, + String eventType, + Long eventSeqNo, + Long estimatedNumEvents) { + this.eventId = eventId; + this.eventType = eventType; + this.eventsDumpProgress = new String(new StringBuilder().append(eventSeqNo).append("/").append(estimatedNumEvents)); + this.dumpTime = System.currentTimeMillis() / 1000; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalLoadBeginLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalLoadBeginLog.java new file mode 100644 index 0000000..c1a2e26 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalLoadBeginLog.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.codehaus.jackson.annotate.JsonProperty; + +public class IncrementalLoadBeginLog extends AbstractReplLog { + @JsonProperty + private String dbName; + + @JsonProperty + private String dumpDir; + + @JsonProperty + private DumpType loadType; + + @JsonProperty + private Long numEvents; + + @JsonProperty + private Long loadStartTime; + + public IncrementalLoadBeginLog(String dbName, String dumpDir, Long numEvents) { + this.dbName = dbName; + this.dumpDir = dumpDir; + this.loadType = DumpType.INCREMENTAL; + this.numEvents = numEvents; + this.loadStartTime = System.currentTimeMillis() / 1000; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalLoadEndLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalLoadEndLog.java new file mode 100644 index 0000000..daf0057 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalLoadEndLog.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.codehaus.jackson.annotate.JsonProperty; + +public class IncrementalLoadEndLog extends AbstractReplLog { + @JsonProperty + private String dbName; + + @JsonProperty + private DumpType loadType; + + @JsonProperty + private Long numEvents; + + @JsonProperty + private Long loadEndTime; + + @JsonProperty + private String dumpDir; + + @JsonProperty + private String lastReplId; + + public IncrementalLoadEndLog(String dbName, + Long numEvents, + String dumpDir, + String lastReplId) { + this.dbName = dbName; + this.loadType = DumpType.INCREMENTAL; + this.numEvents = numEvents; + this.loadEndTime = System.currentTimeMillis() / 1000; + this.dumpDir = dumpDir; + this.lastReplId = lastReplId; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalLoadEventLog.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalLoadEventLog.java new file mode 100644 index 0000000..a603340 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/log/message/IncrementalLoadEventLog.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.log.message; + +import org.codehaus.jackson.annotate.JsonProperty; + +public class IncrementalLoadEventLog extends AbstractReplLog { + @JsonProperty + private String eventId; + + @JsonProperty + private String eventType; + + @JsonProperty + private String eventsLoadProgress; + + @JsonProperty + private Long loadTime; + + public IncrementalLoadEventLog(String eventId, + String eventType, + Long eventSeqNo, + Long numEvents) { + this.eventId = eventId; + this.eventType = eventType; + this.eventsLoadProgress = new String(new StringBuilder().append(eventSeqNo).append("/").append(numEvents)); + this.loadTime = System.currentTimeMillis() / 1000; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java index e574a47..5ded50e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.exec.Task; @@ -34,11 +35,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.CreateTableDesc; -import org.apache.hadoop.hive.ql.plan.CreateViewDesc; /** * ImportTableDesc. @@ -56,7 +54,7 @@ public ImportTableDesc(String dbName, Table table) throws Exception { this.dbName = dbName; this.table = table; - switch (getTableType()) { + switch (getDescType()) { case TABLE: this.createTblDesc = new CreateTableDesc(dbName, table.getTableName(), @@ -122,7 +120,7 @@ public ImportTableDesc(String dbName, Table table) throws Exception { } } - public TYPE getTableType() { + public TYPE getDescType() { if (table.isView() || table.isMaterializedView()) { return TYPE.VIEW; } @@ -143,7 +141,7 @@ public void setViewAsReferenceText(String dbName, Table table) { } public void setReplicationSpec(ReplicationSpec replSpec) { - switch (getTableType()) { + switch (getDescType()) { case TABLE: createTblDesc.setReplicationSpec(replSpec); break; @@ -154,20 +152,20 @@ public void setReplicationSpec(ReplicationSpec replSpec) { } public void setExternal(boolean isExternal) { - if (TYPE.TABLE.equals(getTableType())) { + if (TYPE.TABLE.equals(getDescType())) { createTblDesc.setExternal(isExternal); } } public boolean isExternal() { - if (TYPE.TABLE.equals(getTableType())) { + if (TYPE.TABLE.equals(getDescType())) { return createTblDesc.isExternal(); } return false; } public void setLocation(String location) { - switch (getTableType()) { + switch (getDescType()) { case TABLE: createTblDesc.setLocation(location); break; @@ -178,7 +176,7 @@ public void setLocation(String location) { } public String getLocation() { - switch (getTableType()) { + switch (getDescType()) { case TABLE: return createTblDesc.getLocation(); case VIEW: @@ -188,7 +186,7 @@ public String getLocation() { } public void setTableName(String tableName) throws SemanticException { - switch (getTableType()) { + switch (getDescType()) { case TABLE: createTblDesc.setTableName(tableName); break; @@ -201,7 +199,7 @@ public void setTableName(String tableName) throws SemanticException { } public String getTableName() throws SemanticException { - switch (getTableType()) { + switch (getDescType()) { case TABLE: return createTblDesc.getTableName(); case VIEW: @@ -213,7 +211,7 @@ public String getTableName() throws SemanticException { } public List getPartCols() { - switch (getTableType()) { + switch (getDescType()) { case TABLE: return createTblDesc.getPartCols(); case VIEW: @@ -223,7 +221,7 @@ public String getTableName() throws SemanticException { } public List getCols() { - switch (getTableType()) { + switch (getDescType()) { case TABLE: return createTblDesc.getCols(); case VIEW: @@ -233,7 +231,7 @@ public String getTableName() throws SemanticException { } public Map getTblProps() { - switch (getTableType()) { + switch (getDescType()) { case TABLE: return createTblDesc.getTblProps(); case VIEW: @@ -243,7 +241,7 @@ public String getTableName() throws SemanticException { } public String getInputFormat() { - switch (getTableType()) { + switch (getDescType()) { case TABLE: return createTblDesc.getInputFormat(); case VIEW: @@ -253,7 +251,7 @@ public String getInputFormat() { } public String getOutputFormat() { - switch (getTableType()) { + switch (getDescType()) { case TABLE: return createTblDesc.getOutputFormat(); case VIEW: @@ -263,7 +261,7 @@ public String getOutputFormat() { } public String getSerName() { - switch (getTableType()) { + switch (getDescType()) { case TABLE: return createTblDesc.getSerName(); case VIEW: @@ -273,7 +271,7 @@ public String getSerName() { } public Map getSerdeProps() { - switch (getTableType()) { + switch (getDescType()) { case TABLE: return createTblDesc.getSerdeProps(); case VIEW: @@ -283,14 +281,14 @@ public String getSerName() { } public List getBucketCols() { - if (TYPE.TABLE.equals(getTableType())) { + if (TYPE.TABLE.equals(getDescType())) { return createTblDesc.getBucketCols(); } return null; } public List getSortCols() { - if (TYPE.TABLE.equals(getTableType())) { + if (TYPE.TABLE.equals(getDescType())) { return createTblDesc.getSortCols(); } return null; @@ -300,7 +298,7 @@ public String getSerName() { * @param replaceMode Determine if this CreateTable should behave like a replace-into alter instead */ public void setReplaceMode(boolean replaceMode) { - if (TYPE.TABLE.equals(getTableType())) { + if (TYPE.TABLE.equals(getDescType())) { createTblDesc.setReplaceMode(replaceMode); } } @@ -311,7 +309,7 @@ public String getDatabaseName() { public Task getCreateTableTask(HashSet inputs, HashSet outputs, HiveConf conf) { - switch (getTableType()) { + switch (getDescType()) { case TABLE: return TaskFactory.get(new DDLWork(inputs, outputs, createTblDesc), conf); case VIEW: @@ -328,4 +326,13 @@ public String getDatabaseName() { public boolean isMaterializedView() { return table.isMaterializedView(); } + + public TableType tableType() { + if (isView()) { + return TableType.VIRTUAL_VIEW; + } else if (isMaterializedView()) { + return TableType.MATERIALIZED_VIEW; + } + return TableType.MANAGED_TABLE; + } } diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote old mode 100644 new mode 100755