diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 8f19b7d85d..654bdf802a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; @@ -27,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.processors.DfsProcessor; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hive.common.util.HiveVersionInfo; @@ -77,6 +79,7 @@ import java.util.Set; import java.util.regex.Pattern; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.junit.rules.TestName; import static org.apache.hadoop.hive.conf.SystemVariables.SET_COLUMN_NAME; import static org.apache.hadoop.hive.ql.exec.ExplainTask.EXPL_COLUMN_NAME; @@ -121,6 +124,7 @@ private static final float floatCompareDelta = 0.0001f; @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public final TestName testName = new TestName(); private static Connection getConnection(String postfix) throws SQLException { Connection con1; @@ -2767,10 +2771,10 @@ public void run() { incrementalLogs.addAll(statement.getQueryLog()); Thread.sleep(500); } catch (SQLException e) { - LOG.error("Failed getQueryLog. Error message: " + e.getMessage()); + LOG.info("Failed getQueryLog. Error message: " + e.getMessage()); fail("error in getting log thread"); } catch (InterruptedException e) { - LOG.error("Getting log thread is interrupted. Error message: " + e.getMessage()); + LOG.info("Getting log thread is interrupted. Error message: " + e.getMessage()); fail("error in getting log thread"); } } @@ -2793,6 +2797,126 @@ public void run() { verifyFetchedLog(incrementalLogs, expectedLogs); } + private static int next = 0; + private synchronized void advanceDumpDir() { + next++; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); + } + + /** + * Test getting query log method in Jdbc for REPL commands + * @throws Exception + */ + @Test + public void testGetQueryLogForReplCommands() throws Exception { + // Prepare + String primaryDb = testName.getMethodName() + "_" + System.currentTimeMillis(); + String replicaDb = primaryDb + "_replica"; + String primaryTblName = primaryDb + ".t1"; + Path replDir = new Path(conf.get("test.data.files")); + HiveStatement stmt = (HiveStatement) con.createStatement(); + assertNotNull("Statement is null", stmt); + + replDir = new Path(replDir, primaryDb + "_repl"); + FileSystem fs = FileSystem.get(replDir.toUri(), conf); + fs.mkdirs(replDir); + + try { + // Prepare + stmt.execute("set hive.exec.parallel = true"); + stmt.execute("set hive.server2.logging.operation.level = execution"); + stmt.execute("set hive.metastore.transactional.event.listeners =" + + " org.apache.hive.hcatalog.listener.DbNotificationListener"); + stmt.execute("set hive.metastore.dml.events = true"); + stmt.execute("create database " + primaryDb + " with dbproperties('repl.source.for'='1,2,3')"); + stmt.execute("create table " + primaryTblName + " (id int)"); + stmt.execute("insert into " + primaryTblName + " values (1), (2)"); + stmt.close(); + + // Test query logs for bootstrap dump and load + String[] expectedBootstrapDumpLogs = { + "REPL::START", + "REPL::TABLE_DUMP", + "REPL::END" + }; + + // Bootstrap dump + stmt = (HiveStatement) con.createStatement(); + advanceDumpDir(); + ResultSet replDumpRslt = stmt.executeQuery("repl dump " + primaryDb + + " with ('hive.repl.rootdir' = '" + replDir + "')"); + assertTrue(replDumpRslt.next()); + String dumpLocation = replDumpRslt.getString(1); + String lastReplId = replDumpRslt.getString(2); + List logs = stmt.getQueryLog(false, 10000); + stmt.close(); + LOG.info("Query_Log for Bootstrap Dump"); + verifyFetchedLog(logs, expectedBootstrapDumpLogs); + + String[] expectedBootstrapLoadLogs = { + "REPL::START", + "REPL::TABLE_LOAD", + "REPL::END" + }; + + // Bootstrap load + stmt = (HiveStatement) con.createStatement(); + stmt.execute("repl load " + replicaDb + " from '" + dumpLocation + "'"); + logs = stmt.getQueryLog(false, 10000); + stmt.close(); + LOG.info("Query_Log for Bootstrap Load"); + verifyFetchedLog(logs, expectedBootstrapLoadLogs); + + // Perform operation for incremental replication + stmt = (HiveStatement) con.createStatement(); + stmt.execute("insert into " + primaryTblName + " values (3), (4)"); + stmt.close(); + + // Test query logs for incremental dump and load + String[] expectedIncrementalDumpLogs = { + "REPL::START", + "REPL::EVENT_DUMP", + "REPL::END" + }; + + // Incremental dump + stmt = (HiveStatement) con.createStatement(); + advanceDumpDir(); + replDumpRslt = stmt.executeQuery("repl dump " + primaryDb + " from " + lastReplId + + " with ('hive.repl.rootdir' = '" + replDir + "')"); + assertTrue(replDumpRslt.next()); + dumpLocation = replDumpRslt.getString(1); + lastReplId = replDumpRslt.getString(2); + logs = stmt.getQueryLog(false, 10000); + stmt.close(); + LOG.info("Query_Log for Incremental Dump"); + verifyFetchedLog(logs, expectedIncrementalDumpLogs); + + String[] expectedIncrementalLoadLogs = { + "REPL::START", + "REPL::EVENT_LOAD", + "REPL::END" + }; + + // Incremental load + stmt = (HiveStatement) con.createStatement(); + stmt.execute("repl load " + replicaDb + " from '" + dumpLocation + "'"); + logs = stmt.getQueryLog(false, 10000); + LOG.info("Query_Log for Incremental Load"); + verifyFetchedLog(logs, expectedIncrementalLoadLogs); + } finally { + fs.delete(replDir, true); + // DB cleanup + stmt.execute("drop database if exists " + primaryDb + " cascade"); + stmt.execute("drop database if exists " + replicaDb + " cascade"); + stmt.execute("set hive.exec.parallel = false"); + stmt.execute("set hive.server2.logging.operation.level = verbose"); + stmt.execute("set hive.metastore.dml.events = false"); + stmt.execute("set hive.metastore.transactional.event.listeners = "); + stmt.close(); + } + } + /** * Test getting query log when HS2 disable logging. * @@ -2820,6 +2944,7 @@ private void verifyFetchedLog(List logs, String[] expectedLogs) { } String accumulatedLogs = stringBuilder.toString(); for (String expectedLog : expectedLogs) { + LOG.info("Checking match for " + expectedLog); assertTrue(accumulatedLogs.contains(expectedLog)); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java index 6f6d721515..13010aedb6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.session.SessionState; @@ -74,6 +75,7 @@ public boolean isRunning() { @Override public void run() { + LogUtils.registerLoggingContext(tsk.getConf()); runner = Thread.currentThread(); try { SessionState.start(ss); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 2309fc9b9d..84bdcd7025 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -380,7 +380,7 @@ private void createEndReplLogTask(Context context, Scope scope, dbProps = dbInMetadata.getParameters(); } ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbProps); - Task replLogTask = TaskFactory.get(replLogWork); + Task replLogTask = TaskFactory.get(replLogWork, conf); if (scope.rootTasks.isEmpty()) { scope.rootTasks.add(replLogTask); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java index 7daa850b79..845aad14e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java @@ -48,4 +48,11 @@ public StageType getType() { public String getName() { return "REPL_STATE_LOG"; } + + @Override + public boolean canExecuteInParallel() { + // ReplStateLogTask is executed only when all its parents are done with execution. So running it in parallel has no + // benefits. + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java index a7c8ca4558..7f981fd969 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -68,7 +68,7 @@ public LoadFunction(Context context, ReplLogger replLogger, FunctionEvent event, private void createFunctionReplLogTask(List> functionTasks, String functionName) { ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, functionName); - Task replLogTask = TaskFactory.get(replLogWork); + Task replLogTask = TaskFactory.get(replLogWork, context.hiveConf); DAGTraversal.traverse(functionTasks, new AddDependencyToLeaves(replLogTask)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 4ed215ce2f..13de791fb3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -142,7 +142,7 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dir.getPath().getName(), eventDmd.getDumpType().toString()); - Task barrierTask = TaskFactory.get(replStateLogWork); + Task barrierTask = TaskFactory.get(replStateLogWork, conf); AddDependencyToLeaves function = new AddDependencyToLeaves(barrierTask); DAGTraversal.traverse(evTasks, function); this.log.debug("Updated taskChainTail from {}:{} to {}:{}",