diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java index 64ce100..e697b54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java @@ -78,7 +78,7 @@ */ private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|"). join(new String[]{"org.apache.hadoop.mapreduce.JobSubmitter", - "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(), + "org.apache.hadoop.mapreduce.Job", "SessionState", "ReplState", Task.class.getName(), Driver.class.getName(), "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"})); /* Patterns that are included in performance logging level. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 37aa3ba..402919e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -122,6 +122,10 @@ public static final String DUMPMETADATA = "_dumpmetadata"; + private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; + private static final String FUNCTION_METADATA_DIR_NAME = "_metadata"; + private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); + public enum DUMPTYPE { BOOTSTRAP("BOOTSTRAP"), INCREMENTAL("INCREMENTAL"), @@ -353,6 +357,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { // bootstrap case Long bootDumpBeginReplId = db.getMSC().getCurrentNotificationEventId().getEventId(); for (String dbName : matchesDb(dbNameOrPattern)) { + REPL_STATE_LOG.info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP", dbName); LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); Path dbRoot = dumpDbMetadata(dbName, dumpRoot); dumpFunctionMetadata(dbName, dumpRoot); @@ -361,6 +366,9 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { + " to db root " + dbRoot.toUri()); dumpTbl(ast, dbName, tblName, dbRoot); } + REPL_STATE_LOG.info("Repl Dump: Completed analyzing Repl Dump for DB: {} and created {} COPY tasks to dump " + + "metadata and data", + dbName, rootTasks.size()); } Long bootDumpEndReplId = db.getMSC().getCurrentNotificationEventId().getEventId(); LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, bootDumpEndReplId); @@ -425,6 +433,8 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { evFetcher, eventFrom, maxEventLimit, evFilter); lastReplId = eventTo; + REPL_STATE_LOG.info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: INCREMENTAL", + (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?"); while (evIter.hasNext()){ NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); @@ -432,6 +442,9 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { dumpEvent(ev, evRoot, cmRoot); } + REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}", + (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?"); + LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); writeOutput( Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(lastReplId)), @@ -457,6 +470,8 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Ex getNewEventOnlyReplicationSpec(ev.getEventId()) ); EventHandlerFactory.handlerFor(ev).handle(context); + REPL_STATE_LOG.info("Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and data to path {}", + String.valueOf(ev.getEventId()), ev.getEventType(), evRoot.toUri().toString()); } public static void injectNextDumpDirForTest(String dumpdir){ @@ -496,6 +511,8 @@ private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticExcepti Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); Database dbObj = db.getDatabase(dbName); EximUtil.createDbExportDump(fs, dumpPath, dbObj, getNewReplicationSpec()); + REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata"); + } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e); @@ -503,10 +520,6 @@ private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticExcepti return dbRoot; } - private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; - private static final String FUNCTION_METADATA_DIR_NAME = "_metadata"; - private final static Logger SESSION_STATE_LOG = LoggerFactory.getLogger("SessionState"); - private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticException { Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME); try { @@ -516,7 +529,7 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticE org.apache.hadoop.hive.metastore.api.Function function = db.getFunction(dbName, functionName); if (function.getResourceUris().isEmpty()) { - SESSION_STATE_LOG.warn( + REPL_STATE_LOG.warn( "Not replicating function: " + functionName + " as it seems to have been created " + "without USING clause"); continue; @@ -528,6 +541,7 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticE functionMetadataRoot)) { new FunctionSerializer(function).writeTo(jsonWriter, getNewReplicationSpec()); } + REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName); } } catch (Exception e) { throw new SemanticException(e); @@ -548,8 +562,11 @@ private Path dumpTbl(ASTNode ast, String dbName, String tblName, Path dbRoot) th try { URI toURI = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString()); TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null); + ExportSemanticAnalyzer.prepareExport(ast, toURI, ts, getNewReplicationSpec(), db, conf, ctx, rootTasks, inputs, outputs, LOG); + REPL_STATE_LOG.info("Repl Dump: Analyzed dump for table/view: {}.{} and created copy tasks to dump metadata " + + "and data to path {}", dbName, tblName, toURI.toString()); } catch (HiveException e) { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e); @@ -696,10 +713,13 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { Task taskChainTail = evTaskRoot; int evstage = 0; + int evIter = 0; Long lastEvid = null; Map dbsUpdated = new ReplicationSpec.ReplStateMap(); Map tablesUpdated = new ReplicationSpec.ReplStateMap(); + REPL_STATE_LOG.info("Repl Load: Started analyzing Repl load from path {}, Dump Type: INCREMENTAL", + loadPath.toUri().toString()); for (FileStatus dir : dirsInLoadPath){ LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); // event loads will behave similar to table loads, with one crucial difference @@ -726,6 +746,12 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { List> evTasks = analyzeEventLoad( dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, dbsUpdated, tablesUpdated, eventDmd); + evIter++; + REPL_STATE_LOG.info("Repl Load: Analyzed load for event {}/{} " + + "with ID: {}, Type: {}, Path: {}", + evIter, dirsInLoadPath.length, + dir.getPath().getName(), eventDmd.getDumpType().toString(), locn); + LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0); if ((evTasks != null) && (!evTasks.isEmpty())){ Task barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); @@ -813,6 +839,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { taskChainTail = updateReplIdTask; } rootTasks.add(evTaskRoot); + REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl load from path {} and created import (DDL/COPY/MOVE) tasks", + loadPath.toUri().toString()); } } catch (Exception e) { @@ -1142,6 +1170,9 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) dbName = dbObj.getName(); } + REPL_STATE_LOG.info("Repl Load: Started analyzing Repl Load for DB: {} from Dump Dir: {}, Dump Type: BOOTSTRAP", + dbName, dir.getPath().toUri().toString()); + Task dbRootTask = null; if (existEmptyDb(dbName)) { AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, dbObj.getParameters()); @@ -1166,7 +1197,12 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) for (FileStatus tableDir : dirsInDbPath) { analyzeTableLoad( dbName, null, tableDir.getPath().toUri().toString(), dbRootTask, null, null); + REPL_STATE_LOG.info("Repl Load: Analyzed table/view/partition load from path {}", + tableDir.getPath().toUri().toString()); } + + REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl Load for DB: {} and created import (DDL/COPY/MOVE) tasks", + dbName); } catch (Exception e) { throw new SemanticException(e); }