diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index b19c1aa..9496df0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -231,8 +231,6 @@ private Tuple replDumpDb(String dbName, String fromReplID, String toReplID, Stri } private void loadAndVerify(String replDbName, String dumpLocation, String lastReplId) throws IOException { - run("EXPLAIN REPL LOAD " + replDbName + " FROM '" + dumpLocation + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + replDbName + " FROM '" + dumpLocation + "'", driverMirror); verifyRun("REPL STATUS " + replDbName, lastReplId, driverMirror); return; @@ -333,8 +331,6 @@ public void testBasicWithCM() throws Exception { // Partition droppped after "repl dump" run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)", driver); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); run("REPL STATUS " + dbName + "_dupe", driverMirror); @@ -608,8 +604,6 @@ public void testIncrementalAdds() throws IOException { String incrementalDumpLocn = getResult(0,0,driver); String incrementalDumpId = getResult(0,1,true,driver); LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'", driverMirror); run("REPL STATUS " + dbName + "_dupe", driverMirror); @@ -734,8 +728,6 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event String incrementalDumpLocn = getResult(0, 0, driver); String incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data, driverMirror); } @@ -785,8 +777,6 @@ public void testDrops() throws IOException { run("REPL DUMP " + dbName, driver); String replDumpLocn = getResult(0,0,driver); String replDumpId = getResult(0,1,true,driver); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); verifySetup("REPL STATUS " + dbName + "_dupe", new String[]{replDumpId}, driverMirror); @@ -816,8 +806,6 @@ public void testDrops() throws IOException { String postDropReplDumpLocn = getResult(0,0,driver); String postDropReplDumpId = getResult(0,1,true,driver); LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'", driverMirror); // verify that drops were replicated. This can either be from tables or ptns @@ -876,8 +864,6 @@ public void testDropsWithCM() throws IOException { run("REPL DUMP " + dbName, driver); String replDumpLocn = getResult(0,0,driver); String replDumpId = getResult(0,1,true,driver); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); run("REPL STATUS " + dbName + "_dupe", driverMirror); @@ -922,8 +908,6 @@ public void testDropsWithCM() throws IOException { // Drop partition after dump run("ALTER TABLE " + dbName + ".ptned_copy DROP PARTITION(b='1')", driver); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'", driverMirror); Exception e = null; @@ -992,8 +976,6 @@ public void testAlters() throws IOException { run("REPL DUMP " + dbName, driver); String replDumpLocn = getResult(0,0,driver); String replDumpId = getResult(0,1,true,driver); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); run("REPL STATUS " + dbName + "_dupe", driverMirror); @@ -1068,8 +1050,6 @@ public void testAlters() throws IOException { String postAlterReplDumpLocn = getResult(0,0,driver); String postAlterReplDumpId = getResult(0,1,true,driver); LOG.info("Dumped to {} with id {}->{}", postAlterReplDumpLocn, replDumpId, postAlterReplDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'", driverMirror); // Replication done, we now do the following verifications: @@ -1162,8 +1142,6 @@ public void testIncrementalLoad() throws IOException { String incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); replDumpId = incrementalDumpId; - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data, driverMirror); @@ -1191,8 +1169,6 @@ public void testIncrementalLoad() throws IOException { incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); replDumpId = incrementalDumpId; - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1, driverMirror); @@ -1230,8 +1206,6 @@ public void testIncrementalInserts() throws IOException { String incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); replDumpId = incrementalDumpId; - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); verifyRun("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver); @@ -1251,8 +1225,6 @@ public void testIncrementalInserts() throws IOException { incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); replDumpId = incrementalDumpId; - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data_after_ins, driverMirror); @@ -1270,7 +1242,6 @@ public void testEventTypesForDynamicAddPartitionByInsert() throws IOException { String[] ptn_data = new String[]{ "ten"}; run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data[0] + "')", driver); - run("DROP TABLE " + dbName + ".ptned", driver); // Inject a behaviour where it throws exception if an INSERT event is found // As we dynamically add a partition through INSERT INTO cmd, it should just add ADD_PARTITION @@ -1307,7 +1278,7 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event eventTypeValidator.assertInjectionsPerformed(true,false); InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour - verifyIfTableNotExist(replDbName , "ptned", metaStoreClientMirror); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1)", ptn_data, driverMirror); } @Test @@ -1346,8 +1317,6 @@ public void testIncrementalInsertToPartition() throws IOException { String incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); replDumpId = incrementalDumpId; - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); verifyRun("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driver); verifyRun("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driver); @@ -1368,8 +1337,6 @@ public void testIncrementalInsertToPartition() throws IOException { incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); replDumpId = incrementalDumpId; - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite, driverMirror); @@ -1448,8 +1415,6 @@ public void testInsertToMultiKeyPartition() throws IOException { String incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); replDumpId = incrementalDumpId; - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1980) ORDER BY name", ptn_year_1980, driverMirror); verifyRun("SELECT name from " + dbName + "_dupe.namelist where (day=1) ORDER BY name", ptn_day_1_2, driverMirror); @@ -1475,8 +1440,6 @@ public void testInsertToMultiKeyPartition() throws IOException { incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); replDumpId = incrementalDumpId; - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); verifySetup("SELECT name from " + dbName + "_dupe.namelist where (year=1990 and month=5 and day=25)", data_after_ovwrite, driverMirror); @@ -1908,8 +1871,6 @@ public void testViewsReplication() throws IOException { String incrementalDumpLocn = getResult(0,0,driver); String incrementalDumpId = getResult(0,1,true,driver); LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'", driverMirror); run("REPL STATUS " + dbName + "_dupe", driverMirror); @@ -1934,8 +1895,6 @@ public void testViewsReplication() throws IOException { incrementalDumpLocn = getResult(0, 0, driver); incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); verifyRun("SELECT * from " + dbName + "_dupe.virtual_view_rename", empty, driverMirror); @@ -1949,8 +1908,6 @@ public void testViewsReplication() throws IOException { incrementalDumpLocn = getResult(0, 0, driver); incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); verifyRun("SHOW COLUMNS FROM " + dbName + "_dupe.virtual_view_rename", new String[] {"a", "a_"}, driverMirror); } @@ -2138,8 +2095,6 @@ public void testTruncateTable() throws IOException { String incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); replDumpId = incrementalDumpId; - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data, driverMirror); @@ -2723,8 +2678,6 @@ public void testConstraints() throws IOException { String incrementalDumpLocn = getResult(0, 0, driver); String incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); String pkName = null; @@ -2759,8 +2712,6 @@ public void testConstraints() throws IOException { incrementalDumpLocn = getResult(0, 0, driver); incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4e7c80f..adb916e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2077,13 +2077,30 @@ private void releasePlan(QueryPlan plan) { private void setQueryDisplays(List> tasks) { if (tasks != null) { - for (Task task : tasks) { - task.setQueryDisplay(queryDisplay); - setQueryDisplays(task.getDependentTasks()); + Set> visited = new HashSet>(); + while (!tasks.isEmpty()) { + tasks = setQueryDisplays(tasks, visited); } } } + private List> setQueryDisplays( + List> tasks, + Set> visited) { + List> childTasks = new ArrayList<>(); + for (Task task : tasks) { + if (visited.contains(task)) { + continue; + } + task.setQueryDisplay(queryDisplay); + if (task.getDependentTasks() != null) { + childTasks.addAll(task.getDependentTasks()); + } + visited.add(task); + } + return childTasks; + } + private void logMrWarning(int mrJobs) { if (mrJobs <= 0 || !("mr".equals(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE)))) { return; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 7f3460f..5a6367b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -371,12 +371,17 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, URI fromURI, FileSystem fs, ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn, - EximUtil.SemanticAnalyzerWrapperContext x) { + EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, IOException, HiveException { addPartitionDesc.setReplaceMode(true); if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){ addPartitionDesc.setReplicationSpec(replicationSpec); } - addPartitionDesc.getPartition(0).setLocation(ptn.getLocation()); // use existing location + AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); + if (ptn == null) { + fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec, x); + } else { + partSpec.setLocation(ptn.getLocation()); // use existing location + } return TaskFactory.get(new DDLWork( x.getInputs(), x.getOutputs(), @@ -891,6 +896,12 @@ private static void createReplImportTasks( if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } + } else { + x.getTasks().add(alterSinglePartition( + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x)); + if (updatedMetadata != null) { + updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + } } } else { // If replicating, then the partition already existing means we need to replace, maybe, if diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 7794d3e..2ae83b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -17,7 +17,6 @@ Licensed to the Apache Software Foundation (ASF) under one */ package org.apache.hadoop.hive.ql.parse; -import io.netty.util.internal.StringUtil; import org.antlr.runtime.tree.Tree; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; @@ -372,7 +371,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); } - LOG.debug("Updated taskChainTail from {}{} to {}{}", + LOG.debug("Updated taskChainTail from {}:{} to {}:{}", taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); taskChainTail = barrierTask; }