diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java index 64ce100..e697b54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java @@ -78,7 +78,7 @@ */ private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|"). join(new String[]{"org.apache.hadoop.mapreduce.JobSubmitter", - "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(), + "org.apache.hadoop.mapreduce.Job", "SessionState", "ReplState", Task.class.getName(), Driver.class.getName(), "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"})); /* Patterns that are included in performance logging level. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 2daa123..e2b636d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -113,6 +113,9 @@ private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; + private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; + private static final String FUNCTION_METADATA_DIR_NAME = "_metadata"; + private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); @@ -200,6 +203,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { // bootstrap case Long bootDumpBeginReplId = db.getMSC().getCurrentNotificationEventId().getEventId(); for (String dbName : matchesDb(dbNameOrPattern)) { + REPL_STATE_LOG.info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP", dbName); LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); Path dbRoot = dumpDbMetadata(dbName, dumpRoot); dumpFunctionMetadata(dbName, dumpRoot); @@ -208,6 +212,9 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { + " to db root " + dbRoot.toUri()); dumpTbl(ast, dbName, tblName, dbRoot); } + REPL_STATE_LOG.info("Repl Dump: Completed analyzing Repl Dump for DB: {} and created {} COPY tasks to dump " + + "metadata and data", + dbName, rootTasks.size()); } Long bootDumpEndReplId = db.getMSC().getCurrentNotificationEventId().getEventId(); LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, bootDumpEndReplId); @@ -272,6 +279,8 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { evFetcher, eventFrom, maxEventLimit, evFilter); lastReplId = eventTo; + REPL_STATE_LOG.info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: INCREMENTAL", + (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?"); while (evIter.hasNext()){ NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); @@ -279,6 +288,9 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { dumpEvent(ev, evRoot, cmRoot); } + REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}", + (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?"); + LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); Utils.writeOutput( Arrays.asList( @@ -308,6 +320,8 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Ex getNewEventOnlyReplicationSpec(ev.getEventId()) ); EventHandlerFactory.handlerFor(ev).handle(context); + REPL_STATE_LOG.info("Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and data to path {}", + String.valueOf(ev.getEventId()), ev.getEventType(), evRoot.toUri().toString()); } public static void injectNextDumpDirForTest(String dumpdir){ @@ -347,6 +361,7 @@ private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticExcepti Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); HiveWrapper.Tuple database = new HiveWrapper(db, dbName).database(); EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec); + REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata"); } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e); @@ -354,10 +369,6 @@ private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticExcepti return dbRoot; } - private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; - private static final String FUNCTION_METADATA_DIR_NAME = "_metadata"; - private final static Logger SESSION_STATE_LOG = LoggerFactory.getLogger("SessionState"); - private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticException { Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME); try { @@ -374,7 +385,7 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticE continue; } if (tuple.object.getResourceUris().isEmpty()) { - SESSION_STATE_LOG.warn( + REPL_STATE_LOG.warn( "Not replicating function: " + functionName + " as it seems to have been created " + "without USING clause"); continue; @@ -386,6 +397,7 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticE functionMetadataRoot)) { new FunctionSerializer(tuple.object).writeTo(jsonWriter, tuple.replicationSpec); } + REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName); } } catch (Exception e) { throw new SemanticException(e); @@ -406,8 +418,11 @@ private Path dumpTbl(ASTNode ast, String dbName, String tblName, Path dbRoot) th try { URI toURI = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString()); TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null); + ExportSemanticAnalyzer.prepareExport(ast, toURI, ts, getNewReplicationSpec(), db, conf, ctx, rootTasks, inputs, outputs, LOG); + REPL_STATE_LOG.info("Repl Dump: Analyzed dump for table/view: {}.{} and created copy tasks to dump metadata " + + "and data to path {}", dbName, tblName, toURI.toString()); } catch (HiveException e) { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e); @@ -554,10 +569,14 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { Task taskChainTail = evTaskRoot; int evstage = 0; + int evIter = 0; Long lastEvid = null; Map dbsUpdated = new ReplicationSpec.ReplStateMap(); Map tablesUpdated = new ReplicationSpec.ReplStateMap(); + REPL_STATE_LOG.info("Repl Load: Started analyzing Repl load for DB: {} from path {}, Dump Type: INCREMENTAL", + (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?", + loadPath.toUri().toString()); for (FileStatus dir : dirsInLoadPath){ LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); // event loads will behave similar to table loads, with one crucial difference @@ -584,6 +603,12 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { List> evTasks = analyzeEventLoad( dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, dbsUpdated, tablesUpdated, eventDmd); + evIter++; + REPL_STATE_LOG.info("Repl Load: Analyzed load for event {}/{} " + + "with ID: {}, Type: {}, Path: {}", + evIter, dirsInLoadPath.length, + dir.getPath().getName(), eventDmd.getDumpType().toString(), locn); + LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0); if ((evTasks != null) && (!evTasks.isEmpty())){ Task barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); @@ -671,6 +696,10 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { taskChainTail = updateReplIdTask; } rootTasks.add(evTaskRoot); + REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl load for DB: {} from path {} and created import " + + "(DDL/COPY/MOVE) tasks", + (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?", + loadPath.toUri().toString()); } } catch (Exception e) { @@ -1000,6 +1029,9 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) dbName = dbObj.getName(); } + REPL_STATE_LOG.info("Repl Load: Started analyzing Repl Load for DB: {} from Dump Dir: {}, Dump Type: BOOTSTRAP", + dbName, dir.getPath().toUri().toString()); + Task dbRootTask = null; if (existEmptyDb(dbName)) { AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, dbObj.getParameters()); @@ -1024,6 +1056,8 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) for (FileStatus tableDir : Collections2.filter(Arrays.asList(dirsInDbPath), new TableDirPredicate())) { analyzeTableLoad( dbName, null, tableDir.getPath().toUri().toString(), dbRootTask, null, null); + REPL_STATE_LOG.info("Repl Load: Analyzed table/view/partition load from path {}", + tableDir.getPath().toUri().toString()); } //Function load @@ -1033,8 +1067,13 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) Arrays.asList(fs.listStatus(functionMetaDataRoot, EximUtil.getDirectoryFilter(fs))); for (FileStatus functionDir : functionDirectories) { analyzeFunctionLoad(dbName, functionDir, dbRootTask); + REPL_STATE_LOG.info("Repl Load: Analyzed function load from path {}", + functionDir.getPath().toUri().toString()); } } + + REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl Load for DB: {} and created import (DDL/COPY/MOVE) tasks", + dbName); } catch (Exception e) { throw new SemanticException(e); }