diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aa58d7445c..d3254a66fe 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -489,6 +489,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + "used in conjunction with 'hive.repl.dump.metadata.only' set to false. if 'hive.repl.dump.metadata.only' \n" + " is set to true then this config parameter has no effect as external table meta data is flushed \n" + " always by default."), + REPL_ENABLE_MOVE_OPTIMIZATION("hive.repl.enable.move.optimization", false, + "If its set to true, REPL LOAD copies data files directly to the target table/partition location \n" + + "instead of copying to staging directory first and then move to target location. This optimizes \n" + + " the REPL LOAD on object data stores such as S3 or WASB where creating a directory and move \n" + + " files are costly operations. In file system like HDFS where move operation is atomic, this \n" + + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/UtilsForTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/UtilsForTest.java index 2699154cc0..6207d32026 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/UtilsForTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/UtilsForTest.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hive; -import java.util.Iterator; -import java.util.Map; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -41,20 +38,4 @@ public static void setNewDerbyDbLocation(HiveConf conf, String newloc) { + ";create=true"); } - /** - * Do the variable expansion by calling "set" on each variable. - * When MR jobs are run, under some circumstances they fail because - * the variable expansion fails after changes in Hadoop to prevent - * variable expansion for JobHistoryServer. So expanding them ahead - * so that variables like {test.tmp.dir} get expanded. - * @param hiveConf - */ - public static void expandHiveConfParams(HiveConf hiveConf) { - Iterator> iter = hiveConf.iterator(); - while (iter.hasNext()) { - String key = iter.next().getKey(); - hiveConf.set(key, hiveConf.get(key)); - } - } - } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 3d509f3532..a9783abe10 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -3225,6 +3225,93 @@ public void testRecycleFileNonReplDatabase() throws IOException { assertTrue(fileCount == fileCountAfter); } + @Test + public void testMoveOptimizationBootstrap() throws IOException { + String name = testName.getMethodName(); + String dbName = createDB(name, driver); + String tableNameNoPart = dbName + "_no_part"; + String tableNamePart = dbName + "_part"; + + run(" use " + dbName, driver); + run("CREATE TABLE " + tableNameNoPart + " (fld int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + tableNamePart + " (fld int) partitioned by (part int) STORED AS TEXTFILE", driver); + + run("insert into " + tableNameNoPart + " values (1) ", driver); + run("insert into " + tableNameNoPart + " values (2) ", driver); + verifyRun("SELECT fld from " + tableNameNoPart , new String[]{ "1" , "2" }, driver); + + run("insert into " + tableNamePart + " partition (part=10) values (1) ", driver); + run("insert into " + tableNamePart + " partition (part=10) values (2) ", driver); + run("insert into " + tableNamePart + " partition (part=11) values (3) ", driver); + verifyRun("SELECT fld from " + tableNamePart , new String[]{ "1" , "2" , "3"}, driver); + verifyRun("SELECT fld from " + tableNamePart + " where part = 10" , new String[]{ "1" , "2"}, driver); + verifyRun("SELECT fld from " + tableNamePart + " where part = 11" , new String[]{ "3" }, driver); + + String replDbName = dbName + "_replica"; + Tuple dump = replDumpDb(dbName, null, null, null); + run("REPL LOAD " + replDbName + " FROM '" + dump.dumpLocation + + "' with ('hive.repl.enable.move.optimization'='true')", driverMirror); + verifyRun("REPL STATUS " + replDbName, dump.lastReplId, driverMirror); + + run(" use " + replDbName, driverMirror); + verifyRun("SELECT fld from " + tableNamePart , new String[]{ "1" , "2" , "3"}, driverMirror); + verifyRun("SELECT fld from " + tableNamePart + " where part = 10" , new String[]{ "1" , "2"}, driverMirror); + verifyRun("SELECT fld from " + tableNamePart + " where part = 11" , new String[]{ "3" }, driverMirror); + verifyRun("SELECT fld from " + tableNameNoPart , new String[]{ "1" , "2" }, driverMirror); + verifyRun("SELECT count(*) from " + tableNamePart , new String[]{ "3"}, driverMirror); + verifyRun("SELECT count(*) from " + tableNamePart + " where part = 10" , new String[]{ "2"}, driverMirror); + verifyRun("SELECT count(*) from " + tableNamePart + " where part = 11" , new String[]{ "1" }, driverMirror); + verifyRun("SELECT count(*) from " + tableNameNoPart , new String[]{ "2" }, driverMirror); + } + + @Test + public void testMoveOptimizationIncremental() throws IOException { + String testName = "testMoveOptimizationIncremental"; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_replica"; + + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; + + String[] unptn_data = new String[] { "eleven", "twelve" }; + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')", driver); + verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); + + run("CREATE TABLE " + dbName + ".unptned_late AS SELECT * FROM " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver); + + Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); + run("REPL LOAD " + replDbName + " FROM '" + incrementalDump.dumpLocation + + "' with ('hive.repl.enable.move.optimization'='true')", driverMirror); + verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror); + replDumpId = incrementalDump.lastReplId; + + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data, driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned ", "2", driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned_late", "2", driverMirror); + + String[] unptn_data_after_ins = new String[] { "eleven", "thirteen", "twelve" }; + String[] data_after_ovwrite = new String[] { "hundred" }; + run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[1] + "')", driver); + verifySetup("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driver); + run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')", driver); + verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite, driver); + + incrementalDump = replDumpDb(dbName, replDumpId, null, null); + run("REPL LOAD " + replDbName + " FROM '" + incrementalDump.dumpLocation + + "' with ('hive.repl.enable.move.optimization'='true')", driverMirror); + verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror); + + verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned", data_after_ovwrite, driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned", "1", driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned_late ", "3", driverMirror); + } + private static String createDB(String name, IDriver myDriver) { LOG.info("Testing " + name); run("CREATE DATABASE " + name + " WITH DBPROPERTIES ( '" + diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index f074428dd5..e043e5446f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -141,10 +141,8 @@ public void tearDown() throws Throwable { primary.run("drop database if exists " + primaryDbName + "_extra cascade"); } - @Test - public void testAcidTablesBootstrap() throws Throwable { - WarehouseInstance.Tuple bootstrapDump = primary - .run("use " + primaryDbName) + private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable { + return primary.run("use " + primaryDbName) .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + "tblproperties (\"transactional\"=\"true\")") .run("insert into t1 values(1)") @@ -165,14 +163,15 @@ public void testAcidTablesBootstrap() throws Throwable { .run("insert into t5 values(1111), (2222)") .run("alter table t5 set tblproperties (\"transactional\"=\"true\")") .run("insert into t5 values(3333)") - .dump(primaryDbName, null); + .dump(primaryDbName, fromReplId); + } - replica.load(replicatedDbName, bootstrapDump.dumpLocation) - .run("use " + replicatedDbName) + private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable { + replica.run("use " + replicatedDbName) .run("show tables") .verifyResults(new String[] {"t1", "t2", "t3", "t4", "t5"}) .run("repl status " + replicatedDbName) - .verifyResult(bootstrapDump.lastReplicationId) + .verifyResult(lastReplId) .run("select id from t1 order by id") .verifyResults(new String[]{"1", "2"}) .run("select country from t2 order by country") @@ -185,6 +184,32 @@ public void testAcidTablesBootstrap() throws Throwable { .verifyResults(new String[] {"1111", "2222", "3333"}); } + @Test + public void testAcidTablesBootstrap() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null); + replica.load(replicatedDbName, bootstrapDump.dumpLocation); + verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId); + } + + @Test + public void testAcidTablesMoveOptimizationBootStrap() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null); + replica.load(replicatedDbName, bootstrapDump.dumpLocation, + Collections.singletonList("'hive.repl.enable.move.optimization'='true'")); + verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId); + } + + @Test + public void testAcidTablesMoveOptimizationIncremental() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootstrapDump.dumpLocation, + Collections.singletonList("'hive.repl.enable.move.optimization'='true'")); + WarehouseInstance.Tuple incrDump = prepareDataAndDump(primaryDbName, bootstrapDump.lastReplicationId); + replica.load(replicatedDbName, incrDump.dumpLocation, + Collections.singletonList("'hive.repl.enable.move.optimization'='true'")); + verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId); + } + @Test public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { // Open 5 txns diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 6c45641a0d..6ee927a0a8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -1321,6 +1322,106 @@ public Boolean apply(@Nullable CallerArguments args) { .verifyResult(replicatedDbName + ".testFunctionOne"); } + @Test + public void testMoveOptimizationBootstrapReplLoadRetryAfterFailureFor() throws Throwable { + String replicatedDbName_CM = replicatedDbName + "_CM"; + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .dump(primaryDbName, null); + + testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", + "ADD_PARTITION", tuple); + } + + @Test + public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Throwable { + List withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'"); + String replicatedDbName_CM = replicatedDbName + "_CM"; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("create table t1 (place string) partitioned by (country string)") + .dump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation, withConfigs); + replica.load(replicatedDbName_CM, tuple.dumpLocation, withConfigs); + replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") + .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + + tuple = primary.run("use " + primaryDbName) + .run("insert overwrite table t1 select * from t2") + .dump(primaryDbName, tuple.lastReplicationId); + + testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "INSERT", tuple); + } + + @Test + public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable { + List withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'"); + String replicatedDbName_CM = replicatedDbName + "_CM"; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table t2 (place string) partitioned by (country string)") + .run("ALTER TABLE t2 ADD PARTITION (country='india')") + .dump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation, withConfigs); + replica.load(replicatedDbName_CM, tuple.dumpLocation, withConfigs); + replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") + .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + + tuple = primary.run("use " + primaryDbName) + .run("insert into table t2 partition(country='india') values ('bangalore')") + .dump(primaryDbName, tuple.lastReplicationId); + + testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", "INSERT", tuple); + } + + private void testMoveOptimization(String primarydb, String replicadb, String replicatedDbName_CM, + String tbl, String eventType, WarehouseInstance.Tuple tuple) throws Throwable { + List withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'"); + + // fail add notification for given event type. + BehaviourInjection callerVerifier + = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable NotificationEvent entry) { + if (entry.getEventType().equalsIgnoreCase(eventType) && entry.getTableName().equalsIgnoreCase(tbl)) { + injectionPathCalled = true; + LOG.warn("Verifier - DB: " + String.valueOf(entry.getDbName()) + + " Table: " + String.valueOf(entry.getTableName()) + + " Event: " + String.valueOf(entry.getEventType())); + return false; + } + return true; + } + }; + + InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); + replica.loadFailure(replicadb, tuple.dumpLocation, withConfigs); + InjectableBehaviourObjectStore.resetAddNotificationModifier(); + callerVerifier.assertInjectionsPerformed(true, false); + replica.load(replicadb, tuple.dumpLocation, withConfigs); + + replica.run("use " + replicadb) + .run("select country from " + tbl) + .verifyResults(Arrays.asList("india")); + + primary.run("use " + primarydb) + .run("drop table " + tbl); + + InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); + replica.loadFailure(replicatedDbName_CM, tuple.dumpLocation, withConfigs); + InjectableBehaviourObjectStore.resetAddNotificationModifier(); + callerVerifier.assertInjectionsPerformed(true, false); + replica.load(replicatedDbName_CM, tuple.dumpLocation, withConfigs); + + replica.run("use " + replicatedDbName_CM) + .run("select country from " + tbl) + .verifyResults(Arrays.asList("india")) + .run(" drop database if exists " + replicatedDbName_CM + " cascade"); + } + @Test public void testDumpExternalTableSetFalse() throws Throwable { WarehouseInstance.Tuple tuple = primary diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java index 819838d091..339c4aebab 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java @@ -18,7 +18,6 @@ package org.apache.hive.service.cli; -import org.apache.hadoop.hive.UtilsForTest; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.cli.thrift.EmbeddedThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; @@ -40,7 +39,6 @@ public static void setUpBeforeClass() throws Exception { HiveConf conf = new HiveConf(); conf.setBoolean("datanucleus.schema.autoCreateTables", true); conf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); - UtilsForTest.expandHiveConfParams(conf); service.init(conf); client = new ThriftCLIServiceClient(service); } diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index e5b100c549..e4ac0a927e 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -22,7 +22,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -179,11 +178,6 @@ public MiniHS2 build() throws Exception { if (miniClusterType == MiniClusterType.MR && useMiniKdc) { throw new IOException("Can't create secure miniMr ... yet"); } - Iterator> iter = hiveConf.iterator(); - while (iter.hasNext()) { - String key = iter.next().getKey(); - hiveConf.set(key, hiveConf.get(key)); - } if (isHTTPTransMode) { hiveConf.setVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE, HS2_HTTP_MODE); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 827721f3e8..c1cc6335de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -333,6 +333,9 @@ public int execute(DriverContext driverContext) { for (int i = 0; i implements Serializable { private static final long serialVersionUID = 1L; @@ -61,7 +67,10 @@ protected int execute(DriverContext driverContext) { LOG.debug("ReplCopyTask.execute()"); FileSystem dstFs = null; Path toPath = null; + try { + Hive hiveDb = Hive.get(); + // Note: CopyWork supports copying multiple files, but ReplCopyWork doesn't. // Not clear of ReplCopyWork should inherit from CopyWork. if (work.getFromPaths().length > 1 || work.getToPaths().length > 1) { @@ -136,6 +145,15 @@ protected int execute(DriverContext driverContext) { } LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); + + // in case of move optimization, file is directly copied to destination. So we need to clear the old content, if + // its a replace (insert overwrite ) operation. + if (work.getDeleteDestIfExist() && dstFs.exists(toPath)) { + LOG.debug(" path " + toPath + " is cleaned before renaming"); + hiveDb.cleanUpOneDirectoryForReplace(toPath, dstFs, HIDDEN_FILES_PATH_FILTER, conf, work.getNeedRecycle(), + work.getIsAutoPerge()); + } + if (!FileUtils.mkdir(dstFs, toPath, conf)) { console.printError("Cannot make target directory: " + toPath.toString()); return 2; @@ -156,6 +174,14 @@ protected int execute(DriverContext driverContext) { if (dstFs.exists(destFile)) { String destFileWithSourceName = srcFile.getSourcePath().getName(); Path newDestFile = new Path(destRoot, destFileWithSourceName); + + // if the new file exist then delete it before renaming, to avoid rename failure. If the copy is done + // directly to table path (bypassing staging directory) then there might be some stale files from previous + // incomplete/failed load. No need of recycle as this is a case of stale file. + if (dstFs.exists(newDestFile)) { + LOG.debug(" file " + newDestFile + " is deleted before renaming"); + dstFs.delete(newDestFile, true); + } boolean result = dstFs.rename(destFile, newDestFile); if (!result) { throw new IllegalStateException( @@ -223,11 +249,17 @@ public String getName() { return "REPL_COPY"; } - public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) { + public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf, boolean isAutoPurge, boolean needRecycle) { Task copyTask = null; LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); + if (replicationSpec.isReplace() && conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + rcwork.setDeleteDestIfExist(true); + rcwork.setAutoPurge(isAutoPurge); + rcwork.setNeedRecycle(needRecycle); + } LOG.debug("ReplCopyTask:\trcwork"); if (replicationSpec.isLazy()) { LOG.debug("ReplCopyTask:\tlazy"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 01dd93c527..1de782a756 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -27,7 +27,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.commons.lang.StringUtils; @@ -173,6 +175,12 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext CompilationOpContext opContext) { super.initialize(queryState, queryPlan, driverContext, opContext); + Iterator> iter = conf.iterator(); + while(iter.hasNext()) { + String key = iter.next().getKey(); + conf.set(key, conf.get(key)); + } + job = new JobConf(conf, ExecDriver.class); initializeFiles("tmpjars", getResource(conf, SessionState.ResourceType.JAR)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index c0cfc439d2..45b674e287 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -63,6 +63,7 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.partSpecToString; @@ -224,14 +225,26 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti + partSpecToString(partSpec.getPartSpec()) + " with source location: " + partSpec.getLocation()); - Path tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); + Path tmpPath = replicaWarehousePartitionLocation; + + // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. + LoadFileType loadFileType; + if (event.replicationSpec().isInReplicationScope() && + context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + loadFileType = LoadFileType.IGNORE; + } else { + loadFileType = + event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); + } + Task copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), sourceWarehousePartitionLocation, tmpPath, - context.hiveConf + context.hiveConf, false, false ); - Task movePartitionTask = movePartitionTask(table, partSpec, tmpPath); + Task movePartitionTask = movePartitionTask(table, partSpec, tmpPath, loadFileType); // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for // bootstrap, we skip current partition update. @@ -257,7 +270,8 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti /** * This will create the move of partition data from temp path to actual path */ - private Task movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, Path tmpPath) { + private Task movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, Path tmpPath, + LoadFileType loadFileType) { MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( @@ -268,7 +282,7 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti } else { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), - event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, 0L + loadFileType, 0L ); loadTableWork.setInheritTableSpecs(false); moveWork.setLoadTableWork(loadTableWork); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 089b529b7d..82f687b7d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -59,6 +59,7 @@ import java.util.List; import java.util.TreeMap; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; public class LoadTable { @@ -218,9 +219,25 @@ private String location(ImportTableDesc tblDesc, Database parentDb) private Task loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath, Path fromURI) { Path dataPath = new Path(fromURI, EximUtil.DATA_PATH_NAME); - Path tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo); - Task copyTask = - ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); + Path tmpPath = tgtPath; + + // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. + LoadFileType loadFileType; + if (replicationSpec.isInReplicationScope() && + context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + loadFileType = LoadFileType.IGNORE; + } else { + loadFileType = + replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo); + } + + LOG.debug("adding dependent CopyWork/AddPart/MoveWork for table " + + table.getCompleteName() + " with source location: " + + dataPath.toString() + " and target location " + tmpPath.toString()); + + Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, + false, false); MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { @@ -232,7 +249,7 @@ private String location(ImportTableDesc tblDesc, Database parentDb) } else { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), - replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, 0L + loadFileType, 0L ); moveWork.setLoadTableWork(loadTableWork); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 0b1048c589..50e8c14ac0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1886,7 +1886,8 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par // to ACID updates. So the are not themselves ACID. // Note: this assumes both paths are qualified; which they are, currently. - if ((isMmTableWrite || isFullAcidTable) && loadPath.equals(newPartPath)) { + if (((isMmTableWrite || isFullAcidTable) && loadPath.equals(newPartPath)) || + (loadFileType == LoadFileType.IGNORE)) { // MM insert query, move itself is a no-op. if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)"); @@ -2497,7 +2498,7 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType } // Note: this assumes both paths are qualified; which they are, currently. - if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) { + if (((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) || (loadFileType == LoadFileType.IGNORE)) { /** * some operations on Transactional tables (e.g. Import) write directly to the final location * and avoid the 'move' operation. Since MoveTask does other things, setting 'loadPath' to be @@ -4446,7 +4447,7 @@ private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, } - private void cleanUpOneDirectoryForReplace(Path path, FileSystem fs, + public void cleanUpOneDirectoryForReplace(Path path, FileSystem fs, PathFilter pathFilter, HiveConf conf, boolean purge, boolean isNeedRecycle) throws IOException, HiveException { if (isNeedRecycle && conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) { recycleDirToCmPath(path, purge); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePointLookupOptimizerRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePointLookupOptimizerRule.java index 4d42ab4b9d..eff9a312aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePointLookupOptimizerRule.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePointLookupOptimizerRule.java @@ -19,8 +19,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -46,19 +44,22 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIn; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; import com.google.common.collect.Sets; + public abstract class HivePointLookupOptimizerRule extends RelOptRule { /** @@ -73,7 +74,6 @@ public FilterCondition (int minNumORClauses) { super(operand(Filter.class, any()), minNumORClauses); } - @Override public void onMatch(RelOptRuleCall call) { final Filter filter = call.rel(0); final RexBuilder rexBuilder = filter.getCluster().getRexBuilder(); @@ -93,13 +93,12 @@ public void onMatch(RelOptRuleCall call) { * to generate an IN clause (which is more efficient). If the OR operator contains * AND operator children, the optimization might generate an IN clause that uses * structs. - */ + */ public static class JoinCondition extends HivePointLookupOptimizerRule { public JoinCondition (int minNumORClauses) { super(operand(Join.class, any()), minNumORClauses); } - - @Override + public void onMatch(RelOptRuleCall call) { final Join join = call.rel(0); final RexBuilder rexBuilder = join.getCluster().getRexBuilder(); @@ -133,7 +132,7 @@ protected HivePointLookupOptimizerRule( public void analyzeCondition(RelOptRuleCall call, RexBuilder rexBuilder, - AbstractRelNode node, + AbstractRelNode node, RexNode condition) { // 1. We try to transform possible candidates @@ -174,31 +173,29 @@ public void analyzeCondition(RelOptRuleCall call, @Override public RexNode visitCall(RexCall call) { RexNode node; switch (call.getKind()) { - // FIXME: I don't think there is a need for this right now...calcite have already done the flattening/etc - // removing this case clause will not miss the OR below AND - case AND: - ImmutableList operands = RexUtil.flattenAnd(call.getOperands()); - List newOperands = new ArrayList(); - for (RexNode operand : operands) { - RexNode newOperand; - if (operand.getKind() == SqlKind.OR) { - try { - newOperand = transformIntoInClauseCondition(rexBuilder, - nodeOp.getRowType(), operand, minNumORClauses); - if (newOperand == null) { - newOperand = operand; + case AND: + ImmutableList operands = RexUtil.flattenAnd(((RexCall) call).getOperands()); + List newOperands = new ArrayList(); + for (RexNode operand: operands) { + RexNode newOperand; + if (operand.getKind() == SqlKind.OR) { + try { + newOperand = transformIntoInClauseCondition(rexBuilder, + nodeOp.getRowType(), operand, minNumORClauses); + if (newOperand == null) { + newOperand = operand; + } + } catch (SemanticException e) { + LOG.error("Exception in HivePointLookupOptimizerRule", e); + return call; } - } catch (SemanticException e) { - LOG.error("Exception in HivePointLookupOptimizerRule", e); - return call; + } else { + newOperand = operand; } - } else { - newOperand = operand; + newOperands.add(newOperand); } - newOperands.add(newOperand); - } - node = RexUtil.composeConjunction(rexBuilder, newOperands, false); - break; + node = RexUtil.composeConjunction(rexBuilder, newOperands, false); + break; case OR: try { node = transformIntoInClauseCondition(rexBuilder, @@ -217,184 +214,106 @@ public void analyzeCondition(RelOptRuleCall call, return node; } - /** - * Represents a simple contraint. - * - * Example: a=1 - */ - static class Constraint { - - private RexLiteral literal; - private RexInputRef inputRef; - - public Constraint(RexInputRef inputRef, RexLiteral literal) { - this.literal = literal; - this.inputRef = inputRef; - } - - /** - * Interprets argument as a constraint; if not possible returns null. - */ - public static Constraint of(RexNode n) { - if (!(n instanceof RexCall)) { - return null; - } - RexCall call = (RexCall) n; - if (call.getOperator().getKind() != SqlKind.EQUALS) { - return null; - } - RexNode opA = call.operands.get(0); - RexNode opB = call.operands.get(1); - if (opA instanceof RexLiteral && opB instanceof RexInputRef) { - RexLiteral rexLiteral = (RexLiteral) opA; - RexInputRef rexInputRef = (RexInputRef) opB; - return new Constraint(rexInputRef, rexLiteral); - } - if (opA instanceof RexInputRef && opB instanceof RexLiteral) { - RexLiteral rexLiteral = (RexLiteral) opB; - RexInputRef rexInputRef = (RexInputRef) opA; - return new Constraint(rexInputRef, rexLiteral); - } - return null; - } - - public RexInputRef getKey() { - return inputRef; - } - - } - - /** - * A group of Constraints. - * - * Examples: - * (a=1 && b=1) - * (a=1) - * - * Note: any rexNode is accepted as constraint; but it might be keyed with the empty key; - * which means it can't be parsed as a constraint for some reason; but for completeness... - * - */ - static class ConstraintGroup { - - public static final Function> KEY_FUNCTION = new Function>() { - - @Override - public Set apply(ConstraintGroup a) { - return a.key; - } - }; - private Map constraints = new HashMap<>(); - private RexNode originalRexNode; - private final Set key; - - public ConstraintGroup(RexNode rexNode) { - originalRexNode = rexNode; - - final List conjunctions = RelOptUtil.conjunctions(rexNode); - - for (RexNode n : conjunctions) { - - Constraint c = Constraint.of(n); - if (c == null) { - // interpretation failed; make this node opaque - key = Collections.emptySet(); - return; - } - constraints.put(c.getKey(), c); - } - if (constraints.size() != conjunctions.size()) { - LOG.debug("unexpected situation; giving up on this branch"); - key = Collections.emptySet(); - return; - } - key = constraints.keySet(); - } - - public List getValuesInOrder(List columns) throws SemanticException { - List ret = new ArrayList<>(); - for (RexInputRef rexInputRef : columns) { - Constraint constraint = constraints.get(rexInputRef); - if (constraint == null) { - throw new SemanticException("Unable to find constraint which was earlier added."); - } - ret.add(constraint.literal); - } - return ret; - } - } - - private RexNode transformIntoInClauseCondition(RexBuilder rexBuilder, RelDataType inputSchema, + private static RexNode transformIntoInClauseCondition(RexBuilder rexBuilder, RelDataType inputSchema, RexNode condition, int minNumORClauses) throws SemanticException { assert condition.getKind() == SqlKind.OR; + // 1. We extract the information necessary to create the predicate for the new + // filter + ListMultimap columnConstantsMap = ArrayListMultimap.create(); ImmutableList operands = RexUtil.flattenOr(((RexCall) condition).getOperands()); if (operands.size() < minNumORClauses) { // We bail out return null; } - List allNodes = new ArrayList<>(); - List processedNodes = new ArrayList<>(); for (int i = 0; i < operands.size(); i++) { - ConstraintGroup m = new ConstraintGroup(operands.get(i)); - allNodes.add(m); - } - - Multimap, ConstraintGroup> assignmentGroups = - Multimaps.index(allNodes, ConstraintGroup.KEY_FUNCTION); - - for (Entry, Collection> sa : assignmentGroups.asMap().entrySet()) { - // skip opaque - if (sa.getKey().size() == 0) { - continue; - } - // not enough equalities should not be handled - if (sa.getValue().size() < 2 || sa.getValue().size() < minNumORClauses) { - continue; + final List conjunctions = RelOptUtil.conjunctions(operands.get(i)); + for (RexNode conjunction: conjunctions) { + // 1.1. If it is not a RexCall, we bail out + if (!(conjunction instanceof RexCall)) { + return null; + } + // 1.2. We extract the information that we need + RexCall conjCall = (RexCall) conjunction; + if(conjCall.getOperator().getKind() == SqlKind.EQUALS) { + if (conjCall.operands.get(0) instanceof RexInputRef && + conjCall.operands.get(1) instanceof RexLiteral) { + RexInputRef ref = (RexInputRef) conjCall.operands.get(0); + RexLiteral literal = (RexLiteral) conjCall.operands.get(1); + columnConstantsMap.put(ref, literal); + if (columnConstantsMap.get(ref).size() != i+1) { + // If we have not added to this column before, we bail out + return null; + } + } else if (conjCall.operands.get(1) instanceof RexInputRef && + conjCall.operands.get(0) instanceof RexLiteral) { + RexInputRef ref = (RexInputRef) conjCall.operands.get(1); + RexLiteral literal = (RexLiteral) conjCall.operands.get(0); + columnConstantsMap.put(ref, literal); + if (columnConstantsMap.get(ref).size() != i+1) { + // If we have not added to this column before, we bail out + return null; + } + } else { + // Bail out + return null; + } + } else { + return null; + } } - - allNodes.add(new ConstraintGroup(buildInFor(sa.getKey(), sa.getValue()))); - processedNodes.addAll(sa.getValue()); } - if (processedNodes.isEmpty()) { - return null; - } - allNodes.removeAll(processedNodes); - List ops = new ArrayList<>(); - for (ConstraintGroup mx : allNodes) { - ops.add(mx.originalRexNode); - } - if (ops.size() == 1) { - return ops.get(0); - } else { - return rexBuilder.makeCall(SqlStdOperatorTable.OR, ops); - } - - } - - private RexNode buildInFor(Set set, Collection value) throws SemanticException { - + // 3. We build the new predicate and return it + List newOperands = new ArrayList(operands.size()); + // 3.1 Create structs List columns = new ArrayList(); - columns.addAll(set); - Listoperands = new ArrayList<>(); - - operands.add(useStructIfNeeded(columns)); - for (ConstraintGroup node : value) { - List values = node.getValuesInOrder(columns); - operands.add(useStructIfNeeded(values)); - } + List names = new ArrayList(); + ImmutableList.Builder paramsTypes = ImmutableList.builder(); + List structReturnType = new ArrayList(); + ImmutableList.Builder newOperandsTypes = ImmutableList.builder(); + for (int i = 0; i < operands.size(); i++) { + List constantFields = new ArrayList(operands.size()); - return rexBuilder.makeCall(HiveIn.INSTANCE, operands); - } + for (RexInputRef ref : columnConstantsMap.keySet()) { + // If any of the elements was not referenced by every operand, we bail out + if (columnConstantsMap.get(ref).size() <= i) { + return null; + } + RexLiteral columnConstant = columnConstantsMap.get(ref).get(i); + if (i == 0) { + columns.add(ref); + names.add(inputSchema.getFieldNames().get(ref.getIndex())); + paramsTypes.add(ref.getType()); + structReturnType.add(TypeConverter.convert(ref.getType())); + } + constantFields.add(columnConstant); + } - private RexNode useStructIfNeeded(List columns) { - // Create STRUCT clause - if (columns.size() == 1) { - return columns.get(0); - } else { - return rexBuilder.makeCall(SqlStdOperatorTable.ROW, columns); + if (i == 0) { + RexNode columnsRefs; + if (columns.size() == 1) { + columnsRefs = columns.get(0); + } else { + // Create STRUCT clause + columnsRefs = rexBuilder.makeCall(SqlStdOperatorTable.ROW, columns); + } + newOperands.add(columnsRefs); + newOperandsTypes.add(columnsRefs.getType()); + } + RexNode values; + if (constantFields.size() == 1) { + values = constantFields.get(0); + } else { + // Create STRUCT clause + values = rexBuilder.makeCall(SqlStdOperatorTable.ROW, constantFields); + } + newOperands.add(values); + newOperandsTypes.add(values.getType()); } + + // 4. Create and return IN clause + return rexBuilder.makeCall(HiveIn.INSTANCE, newOperands); } } @@ -418,7 +337,7 @@ private RexNode useStructIfNeeded(List columns) { switch (call.getKind()) { case AND: // IN clauses need to be combined by keeping only common elements - operands = Lists.newArrayList(RexUtil.flattenAnd(call.getOperands())); + operands = Lists.newArrayList(RexUtil.flattenAnd(((RexCall) call).getOperands())); for (int i = 0; i < operands.size(); i++) { RexNode operand = operands.get(i); if (operand.getKind() == SqlKind.IN) { @@ -455,7 +374,7 @@ private RexNode useStructIfNeeded(List columns) { break; case OR: // IN clauses need to be combined by keeping all elements - operands = Lists.newArrayList(RexUtil.flattenOr(call.getOperands())); + operands = Lists.newArrayList(RexUtil.flattenOr(((RexCall) call).getOperands())); for (int i = 0; i < operands.size(); i++) { RexNode operand = operands.get(i); if (operand.getKind() == SqlKind.IN) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index 15958d5e8d..3788ef9882 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -2654,16 +2654,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, st.setNumRows(numRows); st.setDataSize(dataSize); - List colStatsList = st.getColumnStats(); - if(colStatsList != null) { - for (ColStatistics colStats : colStatsList) { - colStats.setNumFalses((long) (colStats.getNumFalses() * udtfFactor)); - colStats.setNumTrues((long) (colStats.getNumTrues() * udtfFactor)); - colStats.setNumNulls((long) (colStats.getNumNulls() * udtfFactor)); - } - st.setColumnStats(colStatsList); - } - if (LOG.isDebugEnabled()) { LOG.debug("[0] STATS-" + uop.toString() + ": " + st.extendedToString()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 6fbe29c5ec..fc9a95cd24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ReplCopyTask; @@ -78,6 +79,8 @@ import java.util.Map; import java.util.TreeMap; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; + /** * ImportSemanticAnalyzer. * @@ -389,33 +392,50 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, - Long writeId, int stmtId) { + Long writeId, int stmtId) throws HiveException { assert table != null; assert table.getParameters() != null; Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); Path destPath = null, loadPath = null; LoadFileType lft; - if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) { - String mmSubdir = replace ? AcidUtils.baseDir(writeId) - : AcidUtils.deltaSubdir(writeId, writeId, stmtId); - destPath = new Path(tgtPath, mmSubdir); - /** - * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition - * directory, i.e. the final destination for these files. This has to be a copy to preserve - * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem. - * So setting 'loadPath' this way will make - * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean, - * boolean, Long, int)} - * skip the unnecessary file (rename) operation but it will perform other things. - */ - loadPath = tgtPath; - lft = LoadFileType.KEEP_EXISTING; + boolean isAutoPurge; + boolean needRecycle; + + if (replicationSpec.isInReplicationScope() && + x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) { + lft = LoadFileType.IGNORE; + destPath = loadPath = tgtPath; + isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge")); + if (table.isTemporary()) { + needRecycle = false; + } else { + org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName()); + needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db); + } } else { - destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath); - lft = replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) { + String mmSubdir = replace ? AcidUtils.baseDir(writeId) + : AcidUtils.deltaSubdir(writeId, writeId, stmtId); + destPath = new Path(tgtPath, mmSubdir); + /** + * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition + * directory, i.e. the final destination for these files. This has to be a copy to preserve + * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem. + * So setting 'loadPath' this way will make + * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean, + * boolean, Long, int)} + * skip the unnecessary file (rename) operation but it will perform other things. + */ + loadPath = tgtPath; + lft = LoadFileType.KEEP_EXISTING; + } else { + destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath); + lft = replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + } + needRecycle = false; + isAutoPurge = false; } - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("adding import work for table with source location: " + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + writeId + @@ -428,7 +448,8 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Task copyTask = null; if (replicationSpec.isInReplicationScope()) { - copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf()); + copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(), + isAutoPurge, needRecycle); } else { copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } @@ -442,7 +463,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Collections.singletonList(tgtPath), true, null, null); moveWork.setMultiFilesDesc(loadFilesWork); - moveWork.setNeedCleanTarget(false); + moveWork.setNeedCleanTarget(replace); } else { LoadTableDesc loadTableWork = new LoadTableDesc( loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId); @@ -496,11 +517,14 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); } - private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, + private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); + boolean isAutoPurge; + boolean needRecycle; + if (tblDesc.isExternal() && tblDesc.getLocation() == null) { x.getLOG().debug("Importing in-place: adding AddPart for partition " + partSpecToString(partSpec.getPartSpec())); @@ -516,11 +540,30 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); Path tgtLocation = new Path(partSpec.getLocation()); - //Replication scope the write id will be invalid - Boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) || - replicationSpec.isInReplicationScope(); - Path destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation) - : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); + + LoadFileType loadFileType = replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + Path destPath; + if (replicationSpec.isInReplicationScope() && + x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) { + loadFileType = LoadFileType.IGNORE; + destPath = tgtLocation; + isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge")); + if (table.isTemporary()) { + needRecycle = false; + } else { + org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName()); + needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db); + } + } else { + //Replication scope the write id will be invalid + Boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) || + replicationSpec.isInReplicationScope(); + destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation) + : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); + isAutoPurge = false; + needRecycle = false; + } + Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) ? destPath : tgtLocation; if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("adding import work for partition with source location: " @@ -535,7 +578,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Task copyTask = null; if (replicationSpec.isInReplicationScope()) { copyTask = ReplCopyTask.getLoadCopyTask( - replicationSpec, new Path(srcLocation), destPath, x.getConf()); + replicationSpec, new Path(srcLocation), destPath, x.getConf(), isAutoPurge, needRecycle); } else { copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } @@ -554,11 +597,11 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Collections.singletonList(tgtLocation), true, null, null); moveWork.setMultiFilesDesc(loadFilesWork); - moveWork.setNeedCleanTarget(false); + moveWork.setNeedCleanTarget(replicationSpec.isReplace()); } else { LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), partSpec.getPartSpec(), - replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, + loadFileType, writeId); loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index e4186c45a8..e4a128182c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -40,12 +41,15 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import java.io.FileNotFoundException; +import java.net.URI; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; @@ -80,6 +84,8 @@ public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; + private static final List CLOUD_SCHEME_PREFIXES = Arrays.asList("s3a", "wasb"); + ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); this.db = super.db; @@ -216,6 +222,20 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } } + private boolean isCloudFS(Path filePath, org.apache.hadoop.conf.Configuration conf) throws Exception { + if (filePath == null) { + throw new HiveException("filePath cannot be null"); + } + + URI uri = filePath.toUri(); + String scheme = uri.getScheme(); + scheme = StringUtils.isBlank(scheme) ? FileSystem.get(uri, conf).getScheme() : scheme; + if (StringUtils.isBlank(scheme)) { + throw new HiveException("Cannot get valid scheme for " + filePath); + } + return CLOUD_SCHEME_PREFIXES.contains(scheme.toLowerCase().trim()); + } + // REPL LOAD private void initReplLoad(ASTNode ast) throws SemanticException { path = PlanUtils.stripQuotes(ast.getChild(0).getText()); @@ -302,6 +322,18 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg()); } + // Ths config is set to make sure that in case of s3 replication, move is skipped. + try { + Warehouse wh = new Warehouse(conf); + Path filePath = wh.getWhRoot(); + if (isCloudFS(filePath, conf)) { + conf.setBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION, true); + LOG.info(" Set move optimization to true for warehouse " + filePath.toString()); + } + } catch (Exception e) { + throw new SemanticException(e.getMessage(), e); + } + // Now, the dumped path can be one of three things: // a) It can be a db dump, in which case we expect a set of dirs, each with a // db name, and with a _metadata file in each, and table dirs inside that. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index f04cd93069..82a722fbc8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -68,7 +68,9 @@ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.met } Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); - withinContext.replicationSpec.setIsReplace(true); + // In case of ACID operations, same directory may have many other sub directory for different write id stmt id + // combination. So we can not set isreplace to true. + withinContext.replicationSpec.setIsReplace(false); EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, qlMdTable, qlPtns, withinContext.replicationSpec, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index 3a32885d1d..14c7f06af6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -195,7 +195,7 @@ ResourceUri destinationResourceUri(ResourceUri resourceUri) Task copyTask = ReplCopyTask.getLoadCopyTask( metadata.getReplicationSpec(), new Path(sourceUri), qualifiedDestinationPath, - context.hiveConf + context.hiveConf, false, false ); replCopyTasks.add(copyTask); ResourceUri destinationUri = diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index f32016725a..78480f22fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -65,7 +65,11 @@ * the file instead of making a duplicate copy. * If any file exist while copy, then just overwrite the file */ - OVERWRITE_EXISTING + OVERWRITE_EXISTING, + /** + * No need to rename the file, used in case of replication to s3 + */ + IGNORE } public LoadTableDesc(final LoadTableDesc o) { super(o.getSourcePath(), o.getWriteType()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java index 3ff9f2fdf2..17cc6b8c79 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java @@ -49,6 +49,12 @@ // If set to false, it'll behave as a traditional CopyTask. protected boolean readSrcAsFilesList = false; + protected boolean deleteDestIfExist = false; + + protected boolean isAutoPurge = false; + + protected boolean needRecycle = false; + private String distCpDoAsUser = null; public ReplCopyWork(final Path srcPath, final Path destPath, boolean errorOnSrcEmpty) { @@ -70,4 +76,28 @@ public void setDistCpDoAsUser(String distCpDoAsUser) { public String distCpDoAsUser() { return distCpDoAsUser; } + + public boolean getDeleteDestIfExist() { + return deleteDestIfExist; + } + + public void setDeleteDestIfExist(boolean deleteDestIfExist) { + this.deleteDestIfExist = deleteDestIfExist; + } + + public boolean getNeedRecycle() { + return needRecycle; + } + + public void setNeedRecycle(boolean needRecycle) { + this.needRecycle = needRecycle; + } + + public boolean getIsAutoPerge() { + return isAutoPurge; + } + + public void setAutoPurge(boolean isAutoPurge) { + this.isAutoPurge = isAutoPurge; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java b/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java index c48dc42091..6d75c2921a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java @@ -17,35 +17,25 @@ */ package org.apache.hadoop.hive.ql.session; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.*; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + /** * OperationLog wraps the actual operation log file, and provides interface * for accessing, reading, writing, and removing the file. */ public class OperationLog { - private static final Logger LOG = LoggerFactory.getLogger(OperationLog.class); + private static final Logger LOG = LoggerFactory.getLogger(OperationLog.class.getName()); private final String operationName; - private final LogFile logFile; // If in test mode then the LogDivertAppenderForTest created an extra log file containing only // the output needed for the qfile results. @@ -55,8 +45,7 @@ private final boolean isShortLogs; // True if the logs should be removed after the operation. Should be used only in test mode private final boolean isRemoveLogs; - - private final LoggingLevel opLoggingLevel; + private LoggingLevel opLoggingLevel = LoggingLevel.UNKNOWN; public enum LoggingLevel { NONE, EXECUTION, PERFORMANCE, VERBOSE, UNKNOWN @@ -69,8 +58,6 @@ public OperationLog(String name, File file, HiveConf hiveConf) { if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { String logLevel = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL); opLoggingLevel = getLoggingLevel(logLevel); - } else { - opLoggingLevel = LoggingLevel.UNKNOWN; } // If in test mod create a test log file which will contain only logs which are supposed to @@ -92,17 +79,15 @@ public OperationLog(String name, File file, HiveConf hiveConf) { } public static LoggingLevel getLoggingLevel (String mode) { - String m = StringUtils.defaultString(mode).toLowerCase(); - switch (m) { - case "none": + if (mode.equalsIgnoreCase("none")) { return LoggingLevel.NONE; - case "execution": + } else if (mode.equalsIgnoreCase("execution")) { return LoggingLevel.EXECUTION; - case "verbose": + } else if (mode.equalsIgnoreCase("verbose")) { return LoggingLevel.VERBOSE; - case "performance": + } else if (mode.equalsIgnoreCase("performance")) { return LoggingLevel.PERFORMANCE; - default: + } else { return LoggingLevel.UNKNOWN; } } @@ -120,8 +105,11 @@ public LoggingLevel getOpLoggingLevel() { */ public List readOperationLog(boolean isFetchFirst, long maxRows) throws SQLException { - LogFile lf = (isShortLogs) ? testLogFile : logFile; - return lf.read(isFetchFirst, maxRows); + if (isShortLogs) { + return testLogFile.read(isFetchFirst, maxRows); + } else { + return logFile.read(isFetchFirst, maxRows); + } } /** @@ -156,10 +144,8 @@ public void close() { if (isFetchFirst) { resetIn(); } - if (maxRows >= (long) Integer.MAX_VALUE) { - throw new SQLException("Cannot support loading this many rows: " + maxRows); - } - return readResults((int)maxRows); + + return readResults(maxRows); } /** @@ -168,57 +154,58 @@ public void close() { */ synchronized void close(boolean removeLog) { try { - resetIn(); - if (removeLog && !isRemoved) { - if (file.exists()) { - FileUtils.forceDelete(file); - } + if (in != null) { + in.close(); + } + if (!isRemoved && removeLog && file.exists()) { + FileUtils.forceDelete(file); isRemoved = true; } - } catch (IOException e) { - LOG.error("Failed to remove corresponding log file of operation: {}", operationName, e); + } catch (Exception e) { + LOG.error("Failed to remove corresponding log file of operation: " + operationName, e); } } private void resetIn() { - IOUtils.closeStream(in); - in = null; + if (in != null) { + IOUtils.closeStream(in); + in = null; + } } - private List readResults(final int nLines) throws SQLException { - final List logs = new ArrayList(); - int readCount = (nLines <= 0) ? Integer.MAX_VALUE : nLines; - + private List readResults(long nLines) throws SQLException { + List logs = new ArrayList(); if (in == null) { try { - in = new BufferedReader(new InputStreamReader( - new FileInputStream(file), StandardCharsets.UTF_8)); + in = new BufferedReader(new InputStreamReader(new FileInputStream(file))); + // Adding name of the log file in an extra log line, so it is easier to find + // the original if there is a test error if (isShortLogs) { - // Adding name of the log file in an extra log line, so it is easier to find - // the original if there is a test error logs.add("Reading log file: " + file); - readCount--; + nLines--; } } catch (FileNotFoundException e) { - return Collections.emptyList(); + return logs; } } - try { - while (readCount > 0) { - final String line = in.readLine(); - readCount--; - final boolean added = CollectionUtils.addIgnoreNull(logs, line); - if (!added) { + String line = ""; + // if nLines <= 0, read all lines in log file. + for (int i = 0; i < nLines || nLines <= 0; i++) { + try { + line = in.readLine(); + if (line == null) { break; + } else { + logs.add(line); + } + } catch (IOException e) { + if (isRemoved) { + throw new SQLException("The operation has been closed and its log file " + + file.getAbsolutePath() + " has been removed.", e); + } else { + throw new SQLException("Reading operation log file encountered an exception: ", e); } - } - } catch (IOException e) { - if (isRemoved) { - throw new SQLException("The operation has been closed and its log file " + - file.getAbsolutePath() + " will be removed", e); - } else { - throw new SQLException("Reading operation log file encountered an exception", e); } } return logs; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHivePointLookupOptimizerRule.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHivePointLookupOptimizerRule.java deleted file mode 100644 index a5932013da..0000000000 --- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHivePointLookupOptimizerRule.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.optimizer.calcite.rules; - -import static org.junit.Assert.assertEquals; - -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptSchema; -import org.apache.calcite.plan.hep.HepPlanner; -import org.apache.calcite.plan.hep.HepProgramBuilder; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.tools.RelBuilder; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; -import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Matchers; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.runners.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class TestHivePointLookupOptimizerRule { - - @Mock - private RelOptSchema schemaMock; - @Mock - RelOptHiveTable tableMock; - @Mock - Table hiveTableMDMock; - - private HepPlanner planner; - private RelBuilder builder; - - @SuppressWarnings("unused") - private static class MyRecord { - public int f1; - public int f2; - } - - @Before - public void before() { - HepProgramBuilder programBuilder = new HepProgramBuilder(); - programBuilder.addRuleInstance(new HivePointLookupOptimizerRule.FilterCondition(2)); - - planner = new HepPlanner(programBuilder.build()); - - JavaTypeFactoryImpl typeFactory = new JavaTypeFactoryImpl(); - RexBuilder rexBuilder = new RexBuilder(typeFactory); - final RelOptCluster optCluster = RelOptCluster.create(planner, rexBuilder); - RelDataType rowTypeMock = typeFactory.createStructType(MyRecord.class); - Mockito.doReturn(rowTypeMock).when(tableMock).getRowType(); - Mockito.doReturn(tableMock).when(schemaMock).getTableForMember(Matchers.any()); - Mockito.doReturn(hiveTableMDMock).when(tableMock).getHiveTableMD(); - - builder = HiveRelFactories.HIVE_BUILDER.create(optCluster, schemaMock); - - } - - public RexNode or(RexNode... args) { - return builder.call(SqlStdOperatorTable.OR, args); - } - - public RexNode and(RexNode... args) { - return builder.call(SqlStdOperatorTable.AND, args); - } - - public RexNode eq(String field, int value) { - return builder.call(SqlStdOperatorTable.EQUALS, - builder.field(field), builder.literal(value)); - } - - @Test - public void testSimpleCase() { - - // @formatter:off - final RelNode basePlan = builder - .scan("t") - .filter( - and( - or( - eq("f1",1), - eq("f1",2) - ), - or( - eq("f2",3), - eq("f2",4) - ) - ) - ) - .build(); - // @formatter:on - - planner.setRoot(basePlan); - RelNode optimizedRelNode = planner.findBestExp(); - - HiveFilter filter = (HiveFilter) optimizedRelNode; - RexNode condition = filter.getCondition(); - assertEquals("AND(IN($0, 1, 2), IN($1, 3, 4))", condition.toString()); - } - - @Test - public void testSimpleStructCase() { - - // @formatter:off - final RelNode basePlan = builder - .scan("t") - .filter( - or( - and( eq("f1",1),eq("f2",1)), - and( eq("f1",2),eq("f2",2)) - ) - ) - .build(); - // @formatter:on - - planner.setRoot(basePlan); - RelNode optimizedRelNode = planner.findBestExp(); - - HiveFilter filter = (HiveFilter) optimizedRelNode; - RexNode condition = filter.getCondition(); - assertEquals("IN(ROW($0, $1), ROW(1, 1), ROW(2, 2))", condition.toString()); - } - - /** Despite the fact that f2=99 is there...the extraction should happen */ - @Test - public void testObscuredSimple() { - - // @formatter:off - final RelNode basePlan = builder - .scan("t") - .filter( - or( - eq("f2",99), - eq("f1",1), - eq("f1",2) - ) - ) - .build(); - // @formatter:on - - planner.setRoot(basePlan); - RelNode optimizedRelNode = planner.findBestExp(); - - HiveFilter filter = (HiveFilter) optimizedRelNode; - RexNode condition = filter.getCondition(); - System.out.println(condition); - assertEquals("OR(IN($0, 1, 2), =($1, 99))", condition.toString()); - } -} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java index 7b6c3e7507..2771cf00b4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java @@ -92,7 +92,7 @@ public void createDestinationPath() throws IOException, SemanticException, URISy mockStatic(ReplCopyTask.class); Task mock = mock(Task.class); when(ReplCopyTask.getLoadCopyTask(any(ReplicationSpec.class), any(Path.class), any(Path.class), - any(HiveConf.class))).thenReturn(mock); + any(HiveConf.class), false, false)).thenReturn(mock); ResourceUri resourceUri = function.destinationResourceUri(new ResourceUri(ResourceType.JAR, "hdfs://localhost:9000/user/someplace/ab.jar#e094828883")); diff --git a/ql/src/test/results/clientpositive/druid/druidmini_test_ts.q.out b/ql/src/test/results/clientpositive/druid/druidmini_test_ts.q.out index f077fb2204..9c624c22e1 100644 --- a/ql/src/test/results/clientpositive/druid/druidmini_test_ts.q.out +++ b/ql/src/test/results/clientpositive/druid/druidmini_test_ts.q.out @@ -507,7 +507,7 @@ STAGE PLANS: properties: druid.fieldNames vc,cstring2 druid.fieldTypes timestamp with local time zone,string - druid.query.json {"queryType":"scan","dataSource":"default.druid_table_alltypesorc","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"or","fields":[{"type":"in","dimension":"__time","values":["2010-01-01T08:00:00.000Z","2011-01-01T08:00:00.000Z"],"extractionFn":{"type":"timeFormat","format":"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'","timeZone":"UTC"}},{"type":"selector","dimension":"cstring2","value":"user1"}]},"virtualColumns":[{"type":"expression","name":"vc","expression":"\"__time\"","outputType":"LONG"}],"columns":["vc","cstring2"],"resultFormat":"compactedList"} + druid.query.json {"queryType":"scan","dataSource":"default.druid_table_alltypesorc","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"or","fields":[{"type":"selector","dimension":"cstring2","value":"user1"},{"type":"selector","dimension":"__time","value":"2010-01-01T08:00:00.000Z","extractionFn":{"type":"timeFormat","format":"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'","timeZone":"UTC"}},{"type":"selector","dimension":"__time","value":"2011-01-01T08:00:00.000Z","extractionFn":{"type":"timeFormat","format":"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'","timeZone":"UTC"}}]},"virtualColumns":[{"type":"expression","name":"vc","expression":"\"__time\"","outputType":"LONG"}],"columns":["vc","cstring2"],"resultFormat":"compactedList"} druid.query.type scan Select Operator expressions: vc (type: timestamp with local time zone), cstring2 (type: string) diff --git a/ql/src/test/results/clientpositive/llap/bucketpruning1.q.out b/ql/src/test/results/clientpositive/llap/bucketpruning1.q.out index 55442ad046..260ba1cbdd 100644 --- a/ql/src/test/results/clientpositive/llap/bucketpruning1.q.out +++ b/ql/src/test/results/clientpositive/llap/bucketpruning1.q.out @@ -1542,6 +1542,9 @@ PREHOOK: type: QUERY POSTHOOK: query: explain extended select * from srcbucket_pruned where key = 1 or value = "One" or key = 2 POSTHOOK: type: QUERY +OPTIMIZED SQL: SELECT `key`, `value`, `ds` +FROM `default`.`srcbucket_pruned` +WHERE `key` = 1 OR `value` = 'One' OR `key` = 2 STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 @@ -1555,12 +1558,12 @@ STAGE PLANS: Map Operator Tree: TableScan alias: srcbucket_pruned - filterExpr: ((key) IN (1, 2) or (value = 'One')) (type: boolean) + filterExpr: ((key = 1) or (value = 'One') or (key = 2)) (type: boolean) Statistics: Num rows: 1 Data size: 372 Basic stats: COMPLETE Column stats: PARTIAL GatherStats: false Filter Operator isSamplingPred: false - predicate: ((key) IN (1, 2) or (value = 'One')) (type: boolean) + predicate: ((key = 1) or (key = 2) or (value = 'One')) (type: boolean) Statistics: Num rows: 1 Data size: 372 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: key (type: int), value (type: string), ds (type: string) diff --git a/ql/src/test/results/clientpositive/perf/spark/query15.q.out b/ql/src/test/results/clientpositive/perf/spark/query15.q.out index 3d6fbdac77..67684f6b0b 100644 --- a/ql/src/test/results/clientpositive/perf/spark/query15.q.out +++ b/ql/src/test/results/clientpositive/perf/spark/query15.q.out @@ -157,7 +157,7 @@ STAGE PLANS: outputColumnNames: _col3, _col4, _col7 Statistics: Num rows: 348467716 Data size: 47189528877 Basic stats: COMPLETE Column stats: NONE Filter Operator - predicate: ((_col3) IN ('CA', 'WA', 'GA') or (_col7 > 500) or (substr(_col4, 1, 5)) IN ('85669', '86197', '88274', '83405', '86475', '85392', '85460', '80348', '81792')) (type: boolean) + predicate: ((_col3 = 'CA') or (_col3 = 'GA') or (_col3 = 'WA') or (_col7 > 500) or (substr(_col4, 1, 5)) IN ('85669', '86197', '88274', '83405', '86475', '85392', '85460', '80348', '81792')) (type: boolean) Statistics: Num rows: 348467716 Data size: 47189528877 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col4 (type: string), _col7 (type: decimal(7,2)) diff --git a/ql/src/test/results/clientpositive/perf/spark/query47.q.out b/ql/src/test/results/clientpositive/perf/spark/query47.q.out index 4a66d0bbd4..a9b50921cc 100644 --- a/ql/src/test/results/clientpositive/perf/spark/query47.q.out +++ b/ql/src/test/results/clientpositive/perf/spark/query47.q.out @@ -245,20 +245,20 @@ STAGE PLANS: Map Operator Tree: TableScan alias: date_dim - filterExpr: (((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) (type: boolean) + filterExpr: (((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) (type: boolean) Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE Filter Operator - predicate: (((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) (type: boolean) - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + predicate: (((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) (type: boolean) + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: d_date_sk (type: int), d_year (type: int), d_moy (type: int) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: int) sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: int), _col2 (type: int) Execution mode: vectorized Map 18 @@ -305,20 +305,20 @@ STAGE PLANS: Map Operator Tree: TableScan alias: date_dim - filterExpr: (((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) (type: boolean) + filterExpr: (((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) (type: boolean) Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE Filter Operator - predicate: (((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) (type: boolean) - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + predicate: (((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) (type: boolean) + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: d_date_sk (type: int), d_year (type: int), d_moy (type: int) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: int) sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: int), _col2 (type: int) Execution mode: vectorized Map 26 @@ -345,20 +345,20 @@ STAGE PLANS: Map Operator Tree: TableScan alias: date_dim - filterExpr: (((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) (type: boolean) + filterExpr: (((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) (type: boolean) Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE Filter Operator - predicate: (((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) (type: boolean) - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + predicate: (((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) (type: boolean) + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: d_date_sk (type: int), d_year (type: int), d_moy (type: int) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: int) sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: int), _col2 (type: int) Execution mode: vectorized Map 9 diff --git a/ql/src/test/results/clientpositive/perf/spark/query57.q.out b/ql/src/test/results/clientpositive/perf/spark/query57.q.out index 502d5f7d8d..6785ee9372 100644 --- a/ql/src/test/results/clientpositive/perf/spark/query57.q.out +++ b/ql/src/test/results/clientpositive/perf/spark/query57.q.out @@ -259,20 +259,20 @@ STAGE PLANS: Map Operator Tree: TableScan alias: date_dim - filterExpr: (((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) (type: boolean) + filterExpr: (((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) (type: boolean) Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE Filter Operator - predicate: (((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) (type: boolean) - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + predicate: (((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) (type: boolean) + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: d_date_sk (type: int), d_year (type: int), d_moy (type: int) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: int) sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: int), _col2 (type: int) Execution mode: vectorized Map 19 @@ -319,20 +319,20 @@ STAGE PLANS: Map Operator Tree: TableScan alias: date_dim - filterExpr: (((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) (type: boolean) + filterExpr: (((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) (type: boolean) Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE Filter Operator - predicate: (((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) (type: boolean) - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + predicate: (((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) (type: boolean) + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: d_date_sk (type: int), d_year (type: int), d_moy (type: int) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: int) sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: int), _col2 (type: int) Execution mode: vectorized Map 27 @@ -359,20 +359,20 @@ STAGE PLANS: Map Operator Tree: TableScan alias: date_dim - filterExpr: (((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) (type: boolean) + filterExpr: (((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) (type: boolean) Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE Filter Operator - predicate: (((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) (type: boolean) - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + predicate: (((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) (type: boolean) + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: d_date_sk (type: int), d_year (type: int), d_moy (type: int) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: int) sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 73048 Data size: 81740712 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: int), _col2 (type: int) Execution mode: vectorized Reducer 12 diff --git a/ql/src/test/results/clientpositive/perf/tez/query15.q.out b/ql/src/test/results/clientpositive/perf/tez/query15.q.out index 3c7ae664b1..e1eca99d95 100644 --- a/ql/src/test/results/clientpositive/perf/tez/query15.q.out +++ b/ql/src/test/results/clientpositive/perf/tez/query15.q.out @@ -71,7 +71,7 @@ Stage-0 Select Operator [SEL_23] (rows=348467716 width=135) Output:["_col4","_col7"] Filter Operator [FIL_22] (rows=348467716 width=135) - predicate:((_col3) IN ('CA', 'WA', 'GA') or (_col7 > 500) or (substr(_col4, 1, 5)) IN ('85669', '86197', '88274', '83405', '86475', '85392', '85460', '80348', '81792')) + predicate:((_col3 = 'CA') or (_col3 = 'GA') or (_col3 = 'WA') or (_col7 > 500) or (substr(_col4, 1, 5)) IN ('85669', '86197', '88274', '83405', '86475', '85392', '85460', '80348', '81792')) Merge Join Operator [MERGEJOIN_77] (rows=348467716 width=135) Conds:RS_19._col0=RS_20._col1(Inner),Output:["_col3","_col4","_col7"] <-Reducer 2 [SIMPLE_EDGE] diff --git a/ql/src/test/results/clientpositive/perf/tez/query47.q.out b/ql/src/test/results/clientpositive/perf/tez/query47.q.out index f931483d57..bd17808826 100644 --- a/ql/src/test/results/clientpositive/perf/tez/query47.q.out +++ b/ql/src/test/results/clientpositive/perf/tez/query47.q.out @@ -199,10 +199,10 @@ Stage-0 <-Map 12 [SIMPLE_EDGE] vectorized SHUFFLE [RS_282] PartitionCols:_col0 - Select Operator [SEL_281] (rows=73049 width=1119) + Select Operator [SEL_281] (rows=73048 width=1119) Output:["_col0","_col1","_col2"] - Filter Operator [FIL_280] (rows=73049 width=1119) - predicate:(((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) + Filter Operator [FIL_280] (rows=73048 width=1119) + predicate:(((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) TableScan [TS_73] (rows=73049 width=1119) default@date_dim,date_dim,Tbl:COMPLETE,Col:NONE,Output:["d_date_sk","d_year","d_moy"] <-Map 1 [SIMPLE_EDGE] vectorized @@ -222,7 +222,7 @@ Stage-0 SHUFFLE [RS_285] Group By Operator [GBY_284] (rows=1 width=12) Output:["_col0","_col1","_col2"],aggregations:["min(_col0)","max(_col0)","bloom_filter(_col0, expectedEntries=1000000)"] - Select Operator [SEL_283] (rows=73049 width=1119) + Select Operator [SEL_283] (rows=73048 width=1119) Output:["_col0"] Please refer to the previous Select Operator [SEL_281] <-Reducer 15 [BROADCAST_EDGE] vectorized diff --git a/ql/src/test/results/clientpositive/perf/tez/query57.q.out b/ql/src/test/results/clientpositive/perf/tez/query57.q.out index fed340af9b..1d3c17d782 100644 --- a/ql/src/test/results/clientpositive/perf/tez/query57.q.out +++ b/ql/src/test/results/clientpositive/perf/tez/query57.q.out @@ -193,10 +193,10 @@ Stage-0 <-Map 12 [SIMPLE_EDGE] vectorized SHUFFLE [RS_282] PartitionCols:_col0 - Select Operator [SEL_281] (rows=73049 width=1119) + Select Operator [SEL_281] (rows=73048 width=1119) Output:["_col0","_col1","_col2"] - Filter Operator [FIL_280] (rows=73049 width=1119) - predicate:(((struct(d_year,d_moy)) IN (const struct(1999,12), const struct(2001,1)) or (d_year = 2000)) and d_date_sk is not null) + Filter Operator [FIL_280] (rows=73048 width=1119) + predicate:(((d_year = 2000) or ((d_year = 1999) and (d_moy = 12)) or ((d_year = 2001) and (d_moy = 1))) and d_date_sk is not null) TableScan [TS_73] (rows=73049 width=1119) default@date_dim,date_dim,Tbl:COMPLETE,Col:NONE,Output:["d_date_sk","d_year","d_moy"] <-Map 1 [SIMPLE_EDGE] vectorized @@ -216,7 +216,7 @@ Stage-0 SHUFFLE [RS_285] Group By Operator [GBY_284] (rows=1 width=12) Output:["_col0","_col1","_col2"],aggregations:["min(_col0)","max(_col0)","bloom_filter(_col0, expectedEntries=1000000)"] - Select Operator [SEL_283] (rows=73049 width=1119) + Select Operator [SEL_283] (rows=73048 width=1119) Output:["_col0"] Please refer to the previous Select Operator [SEL_281] <-Reducer 15 [BROADCAST_EDGE] vectorized diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java index 294dfb728e..da5a71cc64 100755 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -140,11 +140,8 @@ public FileSystem getFs(Path f) throws MetaException { */ public static Path getDnsPath(Path path, Configuration conf) throws MetaException { FileSystem fs = getFs(path, conf); - String uriPath = path.toUri().getPath(); - if (StringUtils.isEmpty(uriPath)) { - uriPath = "/"; - } - return (new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), uriPath)); + return (new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), path + .toUri().getPath())); } public Path getDnsPath(Path path) throws MetaException { diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java index 6ca3e5d8bb..9b460afba2 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; import static org.junit.Assert.assertEquals; @@ -82,6 +83,8 @@ public CallerArguments(String dbName) { private static com.google.common.base.Function callerVerifier = null; + private static com.google.common.base.Function addNotificationEventModifier = null; + // Methods to set/reset getTable modifier public static void setGetTableBehaviour(com.google.common.base.Function modifier){ getTableModifier = (modifier == null) ? com.google.common.base.Functions.identity() : modifier; @@ -115,6 +118,14 @@ public static void setGetNextNotificationBehaviour( getNextNotificationModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; } + public static void setAddNotificationModifier(com.google.common.base.Function modifier) { + addNotificationEventModifier = modifier; + } + + public static void resetAddNotificationModifier() { + setAddNotificationModifier(null); + } + public static void resetGetNextNotificationBehaviour(){ setGetNextNotificationBehaviour(null); } @@ -156,6 +167,19 @@ public NotificationEventResponse getNextNotification(NotificationEventRequest rq return getNextNotificationModifier.apply(super.getNextNotification(rqst)); } + @Override + public void addNotificationEvent(NotificationEvent entry) throws MetaException { + if (addNotificationEventModifier != null) { + Boolean success = addNotificationEventModifier.apply(entry); + if ((success != null) && !success) { + throw new MetaException("InjectableBehaviourObjectStore: Invalid addNotificationEvent operation on DB: " + + entry.getDbName() + " table: " + entry.getTableName() + " event : " + entry.getEventType()); + } + } else { + super.addNotificationEvent(entry); + } + } + @Override public void createTable(Table tbl) throws InvalidObjectException, MetaException { if (callerVerifier != null) {