diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a120b4573d..41bb1f743c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -478,6 +478,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false, "Indicates whether replication dump only metadata information or data + metadata. \n" + "This config makes hive.repl.include.external.tables config ineffective."), + REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE("hive.repl.dump.metadata.only.for.external.table", + false, + "Indicates whether external table replication dump only metadata information or data + metadata"), REPL_BOOTSTRAP_ACID_TABLES("hive.repl.bootstrap.acid.tables", false, "Indicates if repl dump should bootstrap the information about ACID tables along with \n" + "incremental dump for replication. It is recommended to keep this config parameter \n" diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetadataReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetadataReplicationScenariosExternalTables.java new file mode 100644 index 0000000000..7ac1975af5 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetadataReplicationScenariosExternalTables.java @@ -0,0 +1,636 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.*; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG; +import static org.junit.Assert.*; + +public class TestMetadataReplicationScenariosExternalTables extends BaseReplicationAcrossInstances { + + private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; + String extraPrimaryDb; + + @BeforeClass + public static void classLevelSetup() throws Exception { + HashMap overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false"); + overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true"); + overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname, "true"); + overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); + + internalBeforeClassSetup(overrides, TestReplicationScenarios.class); + } + + @Before + public void setup() throws Throwable { + super.setup(); + extraPrimaryDb = "extra_" + primaryDbName; + } + + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + extraPrimaryDb + " cascade"); + super.tearDown(); + } + + @Test + public void replicationWithoutExternalTables() throws Throwable { + List loadWithClause = externalTableBasePathWithClause(); + List dumpWithClause + = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); + + + + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump(primaryDbName, null, dumpWithClause); + + // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + assertFalse(primary.miniDFSCluster.getFileSystem() + .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] { "t1" }) + .run("show tables like 't2'") + .verifyFailure(new String[] { "t2" }) + .verifyReplTargetProperty(replicatedDbName); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("insert into table t3 values (20)") + .dump(primaryDbName, tuple.lastReplicationId, dumpWithClause); + + // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + assertFalse(primary.miniDFSCluster.getFileSystem() + .exists(new Path(tuple.dumpLocation, FILE_NAME))); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't3'") + .verifyFailure(new String[] { "t3" }) + .verifyReplTargetProperty(replicatedDbName); + } + + @Test + public void externalTableReplicationWithDefaultPaths() throws Throwable { + //creates external tables with partitions + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump("repl dump " + primaryDbName); + + // verify that the external table info is not written as metadata only replication + assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + + List withClauseOptions = externalTableBasePathWithClause(); + + replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select country from t2 where country = 'us'") + .verifyResult(null) + .run("select country from t2 where country = 'france'") + .verifyResult(null); + + // Ckpt should be set on bootstrapped db. + replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("create external table t4 as select id from t3") + .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId); + + // verify that the external table info is written correctly for incremental + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't3'") + .verifyResult("t3") + .run("select id from t3") + .verifyResult(null) + .run("select id from t4") + .verifyResult(null); + + tuple = primary.run("use " + primaryDbName) + .run("drop table t1") + .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId); + + // verify that the external table info is written correctly for incremental + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + } + + @Test + public void externalTableReplicationWithCustomPaths() throws Throwable { + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + // Create base directory but use HDFS path without schema or authority details. + // Hive should pick up the local cluster's HDFS schema/authority. + externalTableBasePathWithClause(); + List loadWithClause = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.update'=''" + ); + + WarehouseInstance.Tuple bootstrapTuple = primary.run("use " + primaryDbName) + .run("create external table a (i int, j int) " + + "row format delimited fields terminated by ',' " + + "location '" + externalTableLocation.toUri() + "'") + .dump(primaryDbName, null); + + replica.load(replicatedDbName, bootstrapTuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 'a'") + .verifyResults(Collections.singletonList("a")) + .run("select * From a").verifyResults(Collections.emptyList()); + + //externally add data to location + try (FSDataOutputStream outputStream = + fs.create(new Path(externalTableLocation, "file1.txt"))) { + outputStream.write("1,2\n".getBytes()); + outputStream.write("13,21\n".getBytes()); + } + + WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)") + .dump(primaryDbName, bootstrapTuple.lastReplicationId); + + replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause) + .run("select i From a") + .verifyResults(new String[] {}) + .run("select j from a") + .verifyResults(new String[] {}); + + // alter table location to something new. + externalTableLocation = + new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/"); + incrementalTuple = primary.run("use " + primaryDbName) + .run("alter table a set location '" + externalTableLocation + "'") + .dump(primaryDbName, incrementalTuple.lastReplicationId); + + replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select i From a") + .verifyResults(Collections.emptyList()); + } + + @Test + public void externalTableWithPartitions() throws Throwable { + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/t2/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + List loadWithClause = externalTableBasePathWithClause(); + + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table t2 (place string) partitioned by (country string) row format " + + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + + "'") + .run("insert into t2 partition(country='india') values ('bangalore')") + .dump("repl dump " + primaryDbName); + + assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't2'") + .verifyResults(new String[] { "t2" }) + .run("select place from t2") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + // add new data externally, to a partition, but under the table level top directory + Path partitionDir = new Path(externalTableLocation, "country=india"); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) { + outputStream.write("pune\n".getBytes()); + outputStream.write("mumbai\n".getBytes()); + } + + tuple = primary.run("use " + primaryDbName) + .run("insert into t2 partition(country='australia') values ('sydney')") + .dump(primaryDbName, tuple.lastReplicationId); + + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select distinct(country) from t2") + .verifyResults(new String[] {}) + .run("select place from t2 where country='india'") + .verifyResults(new String[] {}) + .run("select place from t2 where country='australia'") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + Path customPartitionLocation = + new Path("/" + testName.getMethodName() + "/partition_data/t2/country=france"); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + // add new partitions to the table, at an external location than the table level directory + try (FSDataOutputStream outputStream = fs + .create(new Path(customPartitionLocation, "file.txt"))) { + outputStream.write("paris".getBytes()); + } + + tuple = primary.run("use " + primaryDbName) + .run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation + .toString() + "'") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select place from t2 where country='france'") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + // change the location of the partition via alter command + String tmpLocation = "/tmp/" + System.nanoTime(); + primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation), new FsPermission("777")); + + tuple = primary.run("use " + primaryDbName) + .run("alter table t2 partition (country='france') set location '" + tmpLocation + "'") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select place from t2 where country='france'") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + // Changing location of the external table, should result in changes to the location of + // partition residing within the table location and not the partitions located outside. + String tmpLocation2 = "/tmp/" + System.nanoTime() + "_2"; + primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation2), new FsPermission("777")); + + tuple = primary.run("use " + primaryDbName) + .run("insert into table t2 partition(country='france') values ('lyon')") + .run("alter table t2 set location '" + tmpLocation2 + "'") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause); + } + + @Test + public void externalTableIncrementalReplication() throws Throwable { + WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName); + replica.load(replicatedDbName, tuple.dumpLocation); + + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/t1/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t1 (place string) partitioned by (country string) row format " + + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + + "'") + .run("alter table t1 add partition(country='india')") + .run("alter table t1 add partition(country='us')") + .dump(primaryDbName, tuple.lastReplicationId); + + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + + // Add new data externally, to a partition, but under the partition level top directory + // Also, it is added after dumping the events but data should be seen at target after REPL LOAD. + Path partitionDir = new Path(externalTableLocation, "country=india"); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) { + outputStream.write("pune\n".getBytes()); + outputStream.write("mumbai\n".getBytes()); + } + + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) { + outputStream.write("bangalore\n".getBytes()); + } + + List loadWithClause = externalTableBasePathWithClause(); + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show partitions t1") + .verifyResults(new String[] { "country=india", "country=us" }) + .run("select place from t1 order by place") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + // Delete one of the file and update another one. + fs.delete(new Path(partitionDir, "file.txt"), true); + fs.delete(new Path(partitionDir, "file1.txt"), true); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) { + outputStream.write("chennai\n".getBytes()); + } + + // Repl load with zero events but external tables location info should present. + tuple = primary.dump(primaryDbName, tuple.lastReplicationId); + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show partitions t1") + .verifyResults(new String[] { "country=india", "country=us" }) + .run("select place from t1 order by place") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + Hive hive = Hive.get(replica.getConf()); + Set partitions = + hive.getAllPartitionsOf(hive.getTable(replicatedDbName + ".t1")); + List paths = partitions.stream().map(p -> p.getDataLocation().toUri().getPath()) + .collect(Collectors.toList()); + + tuple = primary + .run("alter table t1 drop partition (country='india')") + .run("alter table t1 drop partition (country='us')") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("select * From t1") + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); + + for (String path : paths) { + assertTrue(replica.miniDFSCluster.getFileSystem().exists(new Path(path))); + } + } + + @Test + public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { + List loadWithClause = externalTableBasePathWithClause(); + List dumpWithClause + = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); + + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump(primaryDbName, null, dumpWithClause); + + // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + assertFalse(primary.miniDFSCluster.getFileSystem() + .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] {"t1" }) + .run("show tables like 't2'") + .verifyFailure(new String[] {"t2" }) + .verifyReplTargetProperty(replicatedDbName); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'"); + tuple = primary.run("use " + primaryDbName) + .run("drop table t1") + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("insert into table t3 values (20)") + .run("create table t4 as select * from t3") + .dump(primaryDbName, tuple.lastReplicationId, dumpWithClause); + + // the _external_tables_file info should be created as external tables are to be replicated. + assertTrue(primary.miniDFSCluster.getFileSystem() + .exists(new Path(tuple.dumpLocation, FILE_NAME))); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t2", "t3"), + new Path(tuple.dumpLocation, FILE_NAME)); + + // _bootstrap directory should be created as bootstrap enabled on external tables. + Path dumpPath = new Path(tuple.dumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME); + assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath)); + + // _bootstrap//t2 + // _bootstrap//t3 + Path dbPath = new Path(dumpPath, primaryDbName); + Path tblPath = new Path(dbPath, "t2"); + assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); + tblPath = new Path(dbPath, "t3"); + assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] {"t1" }) + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .verifyReplTargetProperty(replicatedDbName); + + // Ckpt should be set on bootstrapped tables. + replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2", "t3"), tuple.dumpLocation); + + // Drop source tables to see if target points to correct data or not after bootstrap load. + primary.run("use " + primaryDbName) + .run("drop table t2") + .run("drop table t3"); + + // Create table event for t4 should be applied along with bootstrapping of t2 and t3 + replica.run("use " + replicatedDbName) + .run("select place from t2 where country = 'us'") + .verifyResult("austin") + .run("select place from t2 where country = 'france'") + .verifyResult("paris") + .run("select id from t3 order by id") + .verifyResults(Arrays.asList("10", "20")) + .run("select id from t4 order by id") + .verifyResults(Arrays.asList("10", "20")) + .verifyReplTargetProperty(replicatedDbName); + } + + @Test + public void testExternalTablesIncReplicationWithConcurrentDropTable() throws Throwable { + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" + ); + List loadWithClause = externalTableBasePathWithClause(); + WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .dump(primaryDbName, null, dumpWithClause); + + replica.load(replicatedDbName, tupleBootstrap.dumpLocation, loadWithClause); + + // Insert a row into "t1" and create another external table using data from "t1". + primary.run("use " + primaryDbName) + .run("insert into table t1 values (2)") + .run("create external table t2 as select * from t1"); + + // Inject a behavior so that getTable returns null for table "t1". This ensures the table is + // skipped for data files listing. + BehaviourInjection tableNuller = new BehaviourInjection() { + @Nullable + @Override + public Table apply(@Nullable Table table) { + LOG.info("Performing injection on table " + table.getTableName()); + if (table.getTableName().equalsIgnoreCase("t1")){ + injectionPathCalled = true; + return null; + } else { + nonInjectedPathCalled = true; + return table; + } + } + }; + InjectableBehaviourObjectStore.setGetTableBehaviour(tableNuller); + WarehouseInstance.Tuple tupleInc; + try { + // The t1 table will be skipped from data location listing. + tupleInc = primary.dump(primaryDbName, tupleBootstrap.lastReplicationId, dumpWithClause); + tableNuller.assertInjectionsPerformed(true, true); + } finally { + InjectableBehaviourObjectStore.resetGetTableBehaviour(); // reset the behaviour + } + + // Only table t2 should exist in the data location list file. + assertFalseExternalFileInfo(new Path(tupleInc.dumpLocation, FILE_NAME)); + + // The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have + // inserted data. + replica.load(replicatedDbName, tupleInc.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select id from t1 order by id") + .verifyResult(null) + .run("select id from t2 order by id") + .verifyResults(new String[]{}); + } + + @Test + public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { + List loadWithClause = externalTableBasePathWithClause(); + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" + ); + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .dump(primaryDbName, null, dumpWithClause); + + replica.load(replicatedDbName, tuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId); + + // This looks like an empty dump but it has the ALTER TABLE event created by the previous + // dump. We need it here so that the next dump won't have any events. + WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, tuple.lastReplicationId, dumpWithClause); + replica.load(replicatedDbName, incTuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(incTuple.lastReplicationId); + + // create events for some other database and then dump the primaryDbName to dump an empty directory. + primary.run("create database " + extraPrimaryDb + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + WarehouseInstance.Tuple inc2Tuple = primary.run("use " + extraPrimaryDb) + .run("create table tbl (fld int)") + .run("use " + primaryDbName) + .dump(primaryDbName, incTuple.lastReplicationId, dumpWithClause); + Assert.assertEquals(primary.getCurrentNotificationEventId().getEventId(), + Long.valueOf(inc2Tuple.lastReplicationId).longValue()); + + // Incremental load to existing database with empty dump directory should set the repl id to the last event at src. + replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(inc2Tuple.lastReplicationId); + } + + private List externalTableBasePathWithClause() throws IOException, SemanticException { + return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + } + + private void assertFalseExternalFileInfo(Path externalTableInfoFile) + throws IOException { + DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); + Assert.assertFalse(fileSystem.exists(externalTableInfoFile)); + } + + private void assertExternalFileInfo(List expected, Path externalTableInfoFile) + throws IOException { + ReplicationTestUtils.assertExternalFileInfo(primary, expected, externalTableInfoFile); + } +} diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java index cb3d9cc4e0..d9c5666bc4 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java @@ -39,21 +39,13 @@ private BufferAllocator allocator; private ArrowStreamReader arrowStreamReader; - //Allows client to provide and manage their own arrow BufferAllocator public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class clazz, - JobConf job, Closeable client, Socket socket, BufferAllocator allocator) throws IOException { + JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException { super(in, schema, clazz, job, client, socket); - this.allocator = allocator; + allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit); this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator); } - //Use the global arrow BufferAllocator - public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class clazz, - JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException { - this(in, schema, clazz, job, client, socket, - RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit)); - } - @Override public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOException { try { @@ -84,9 +76,6 @@ public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOExcep @Override public void close() throws IOException { arrowStreamReader.close(); - //allocator.close() will throw exception unless all buffers have been released - //See org.apache.arrow.memory.BaseAllocator.close() - allocator.close(); } } diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java index 46566be332..fafbdee210 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java @@ -25,28 +25,16 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import java.io.IOException; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory; -import java.util.UUID; /* * Adapts an Arrow batch reader to a row reader - * Only used for testing */ public class LlapArrowRowInputFormat implements InputFormat { private LlapBaseInputFormat baseInputFormat; public LlapArrowRowInputFormat(long arrowAllocatorLimit) { - BufferAllocator allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit).newChildAllocator( - //allocator name, use UUID for testing - UUID.randomUUID().toString(), - //No use for reservation, allocators claim memory from the same pool, - //but allocate/releases are tracked per-allocator - 0, - //Limit passed in by client - arrowAllocatorLimit); - baseInputFormat = new LlapBaseInputFormat(true, allocator); + baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit); } @Override diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 6bf7f33f64..5c99655104 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -66,7 +66,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.arrow.memory.BufferAllocator; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.Credentials; @@ -108,7 +107,6 @@ private String query; private boolean useArrow; private long arrowAllocatorLimit; - private BufferAllocator allocator; private final Random rand = new Random(); public static final String URL_KEY = "llap.if.hs2.connection"; @@ -131,17 +129,11 @@ public LlapBaseInputFormat(String url, String user, String pwd, String query) { this.query = query; } - //Exposed only for testing, clients should use LlapBaseInputFormat(boolean, BufferAllocator instead) public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) { this.useArrow = useArrow; this.arrowAllocatorLimit = arrowAllocatorLimit; } - public LlapBaseInputFormat(boolean useArrow, BufferAllocator allocator) { - this.useArrow = useArrow; - this.allocator = allocator; - } - public LlapBaseInputFormat() { this.useArrow = false; } @@ -222,19 +214,10 @@ public LlapBaseInputFormat() { @SuppressWarnings("rawtypes") LlapBaseRecordReader recordReader; if(useArrow) { - if(allocator != null) { - //Client provided their own allocator - recordReader = new LlapArrowBatchRecordReader( - socket.getInputStream(), llapSplit.getSchema(), - ArrowWrapperWritable.class, job, llapClient, socket, - allocator); - } else { - //Client did not provide their own allocator, use constructor for global allocator - recordReader = new LlapArrowBatchRecordReader( - socket.getInputStream(), llapSplit.getSchema(), - ArrowWrapperWritable.class, job, llapClient, socket, - arrowAllocatorLimit); - } + recordReader = new LlapArrowBatchRecordReader( + socket.getInputStream(), llapSplit.getSchema(), + ArrowWrapperWritable.class, job, llapClient, socket, + arrowAllocatorLimit); } else { recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 622433bb10..5b21b918b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -172,7 +172,8 @@ private boolean shouldExamineTablesToDump() { */ private boolean shouldDumpExternalTableLocation() { return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) - && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); + && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) && + !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE)); } /** @@ -493,7 +494,8 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) Exception caught = null; boolean shouldWriteExternalTableLocationInfo = conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) - && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); + && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) && + !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE)); try (Writer writer = new Writer(dbRoot, conf)) { for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index 4c504be894..c7aa0077a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -106,7 +106,8 @@ public static Path externalTableDataPath(HiveConf hiveConf, Path basePath, Path this.hiveConf = hiveConf; writePath = new Path(dbRoot, FILE_NAME); includeExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); - dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); + dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || + hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE); if (shouldWrite()) { this.writer = FileSystem.get(hiveConf).create(writePath); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 276f759a7e..810a4c5284 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -49,12 +49,12 @@ import java.util.List; import java.util.Map; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Reader; -import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Reader; +import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java index d01e24c385..f0c875f5bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -57,7 +58,7 @@ public HiveWrapper(Hive db, String dbName, long lastReplId) { public Tuple table(final String tableName, HiveConf conf) throws HiveException { // Column statistics won't be accurate if we are dumping only metadata - boolean getColStats = !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); + boolean getColStats = !Utils.shouldDumpMetaDataOnly(db.getTable(tableName), conf); return new Tuple<>(functionForSpec, () -> db.getTable(dbName, tableName, true, false, getColStats)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 01b7fdc4b6..3d64c7b385 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -72,8 +73,8 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication ? null : tableSpec; this.replicationSpec = replicationSpec; - if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || - (this.tableSpec != null && this.tableSpec.tableHandle.isView())) { + if (this.tableSpec != null && this.tableSpec.tableHandle.isView() || + Utils.shouldDumpMetaDataOnly(tableSpec.tableHandle, conf)) { this.replicationSpec.setIsMetadataOnly(true); this.tableSpec.tableHandle.setStatsStateLikeNewTable(); @@ -92,7 +93,8 @@ public boolean write() throws SemanticException { } else if (shouldExport()) { PartitionIterable withPartitions = getPartitions(); writeMetaData(withPartitions); - if (!replicationSpec.isMetadataOnly()) { + if (!replicationSpec.isMetadataOnly() + && !tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE)) { writeData(withPartitions); } return true; @@ -158,10 +160,8 @@ private void writeData(PartitionIterable partitions) throws SemanticException { } else { List dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); - - // this is the data copy new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) - .export(replicationSpec); + .export(replicationSpec); } } catch (Exception e) { throw new SemanticException(e.getMessage(), e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index bc9f06dfa9..6f8912b5f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -270,4 +271,11 @@ public static boolean shouldReplicate(NotificationEvent tableForEvent, return Collections.singletonList(fromPath); } } + + public static boolean shouldDumpMetaDataOnly(Table table, HiveConf conf) { + return conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || + (conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) && + table.getTableType().equals(TableType.EXTERNAL_TABLE) && + conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE)); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 0168240829..aedf69870a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -229,7 +229,7 @@ public void handle(Context withinContext) throws Exception { // If we are not dumping metadata about a table, we shouldn't be dumping basic statistics // as well, since that won't be accurate. So reset them to what they would look like for an // empty table. - if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { + if (Utils.shouldDumpMetaDataOnly(qlMdTableAfter, withinContext.hiveConf)) { qlMdTableAfter.setStatsStateLikeNewTable(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 837d51c8c8..ec488f726a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; import org.apache.hadoop.hive.ql.metadata.Table; @@ -66,7 +67,7 @@ public void handle(Context withinContext) throws Exception { // If we are not dumping data about a table, we shouldn't be dumping basic statistics // as well, since that won't be accurate. So reset them to what they would look like for an // empty table. - if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { + if (Utils.shouldDumpMetaDataOnly(qlMdTable, withinContext.hiveConf)) { qlMdTable.setStatsStateLikeNewTable(); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestNumMetastoreCalls.java b/ql/src/test/org/apache/hadoop/hive/ql/TestNumMetastoreCalls.java new file mode 100644 index 0000000000..6e755d588d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestNumMetastoreCalls.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +// this test is to ensure number of calls to metastore server by query compiler +public class TestNumMetastoreCalls { + static HiveConf hConf = null; + static Driver driver = null; + + @BeforeClass + public static void Setup() throws Exception { + hConf = new HiveConf(Driver.class); + driver = setUpImpl(hConf); + driver.run("create table t1(id1 int, name1 string)"); + driver.run("create table t2(id2 int, id1 int, name2 string)"); + driver.run("create database db1"); + driver.run("create table db1.tdb1(id2 int, id1 int, name2 string)"); + driver.run("create table tpart(id2 int, id1 int)" + + " partitioned by (name string)"); + driver.run("alter table tpart add partition (name='p1')") ; + driver.run("alter table tpart add partition (name='p2')") ; + } + + @AfterClass + public static void Teardown() throws Exception { + driver.run("drop table t1"); + driver.run("drop table t2"); + driver.run("drop table db1.tdb1"); + driver.run("drop database db1 cascade"); + } + + private static Driver setUpImpl(HiveConf hiveConf) throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, true); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.RAW_STORE_IMPL, + "org.apache.hadoop.hive.ql.TestNumMetastoreCallsObjectStore"); + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.SCHEDULED_QUERIES_ENABLED, false); + SessionState.start(hiveConf); + return new Driver(hiveConf); + } + + // compiler should do 6 metastore calls for each table reference + // get table, get table col statistics + // pk, fk, unique, not null constraints + // for partitioned table there would be an extra call to get partitions + @Test + public void testSelectQuery() { + int numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + int numCallsAfter = 0; + + // simple select * + String query1 = "select * from t1"; + int rc = driver.compile(query1, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 6); + + // single table + numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + String query2 = "select count(distinct id1) from t1 group by name1"; + rc = driver.compile(query2, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 6); + + // two different tables + numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + String query3 = "select count(*) from t1 join t2 on t1.id1 = t2.id1 " + + "where t2.id2 > 0 group by t1.name1, t2.name2"; + rc = driver.compile(query3, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 12 ); + + //from different dbs + numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + String query4 = "select count(*) from t1 join db1.tdb1 as t2 on t1.id1 = t2.id1 " + + "where t2.id2 > 0 group by t1.name1, t2.name2"; + rc = driver.compile(query4, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 12); + + // three table join + numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + String query5 = "select count(*) from t1 join db1.tdb1 as dbt2 on t1.id1 = dbt2.id1 " + + "join t2 on t1.id1 = t2.id1 " + + "where t2.id2 > 0 group by t1.name1, t2.name2"; + rc = driver.compile(query5, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 18); + + // single partitioned table + numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + String query6 = "select count(distinct id1) from tpart group by name"; + rc = driver.compile(query6, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 7); + + // two different tables + numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + String query7 = "select count(*) from t1 join tpart on t1.id1 = tpart.id1 " + + "where tpart.id2 > 0 group by t1.name1, tpart.name"; + rc = driver.compile(query7, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 13); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestNumMetastoreCallsObjectStore.java b/ql/src/test/org/apache/hadoop/hive/ql/TestNumMetastoreCallsObjectStore.java new file mode 100644 index 0000000000..df72c422bb --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestNumMetastoreCallsObjectStore.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.metastore.*; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.Catalog; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec; +import org.apache.hadoop.hive.metastore.api.GetPartitionsProjectionSpec; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse; +import org.apache.hadoop.hive.metastore.api.RuntimeStat; +import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint; +import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; +import org.apache.thrift.TException; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +// this class is a test wrapper around ObjectStore overriding most of the get methods +// used by compiler. This is used my TestNumMetastoreCalls to ensure the number of calls to +// metastore +public class TestNumMetastoreCallsObjectStore extends ObjectStore { + + static Map callMap = new HashMap<>(); + static private int numCalls = 0; + + static void incrementCall() { + numCalls++; + } + + static int getNumCalls() { + return numCalls; + } + + public TestNumMetastoreCallsObjectStore() { + super(); + } + + + @Override public Catalog getCatalog(String catalogName) + throws NoSuchObjectException, MetaException { + incrementCall(); + return super.getCatalog(catalogName); + } + + @Override public List getCatalogs() throws MetaException { + incrementCall(); + return super.getCatalogs(); + } + + @Override public Database getDatabase(String catalogName, String name) + throws NoSuchObjectException { + incrementCall(); + return super.getDatabase(catalogName, name); + } + + @Override public List getDatabases(String catName, String pattern) throws MetaException { + incrementCall(); + return super.getDatabases(catName, pattern); + } + + @Override public List getAllDatabases(String catName) throws MetaException { + incrementCall(); + return super.getAllDatabases(catName); + } + + @Override public Table getTable(String catName, String dbName, String tableName, + String writeIdList) throws MetaException { + incrementCall(); + return super.getTable(catName, dbName, tableName, writeIdList); + } + + @Override public List getTables(String catName, String dbName, String pattern) + throws MetaException { + incrementCall(); + return super.getTables(catName, dbName, pattern); + } + + @Override public List getTables(String catName, String dbName, String pattern, + TableType tableType, int limit) throws MetaException { + incrementCall(); + return super.getTables(catName, dbName, pattern, tableType, limit); + } + + @Override public List getTableNamesWithStats() + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getTableNamesWithStats(); + } + + @Override public Map> getPartitionColsWithStats(String catName, + String dbName, String tableName) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionColsWithStats(catName, dbName, tableName); + } + + @Override public List getAllTableNamesForStats() + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getAllTableNamesForStats(); + } + + @Override public List
getAllMaterializedViewObjectsForRewriting(String catName) + throws MetaException { + incrementCall(); + return super.getAllMaterializedViewObjectsForRewriting(catName); + } + + @Override public List getMaterializedViewsForRewriting(String catName, String dbName) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getMaterializedViewsForRewriting(catName, dbName); + } + + @Override public int getDatabaseCount() throws MetaException { + incrementCall(); + return super.getDatabaseCount(); + } + + @Override public int getPartitionCount() throws MetaException { + incrementCall(); + return super.getPartitionCount(); + } + + @Override public int getTableCount() throws MetaException { + incrementCall(); + return super.getTableCount(); + } + + @Override public List getTableMeta(String catName, String dbNames, String tableNames, + List tableTypes) throws MetaException { + incrementCall(); + return super.getTableMeta(catName, dbNames, tableNames, tableTypes); + } + + @Override public List getAllTables(String catName, String dbName) throws MetaException { + incrementCall(); + return super.getAllTables(catName, dbName); + } + + @Override public List
getTableObjectsByName(String catName, String db, + List tbl_names) throws MetaException, UnknownDBException { + incrementCall(); + return super.getTableObjectsByName(catName, db, tbl_names); + } + + @Override public Partition getPartition(String catName, String dbName, String tableName, + List part_vals) throws NoSuchObjectException, MetaException { + incrementCall(); + return super.getPartition(catName, dbName, tableName, part_vals); + } + + @Override public Partition getPartition(String catName, String dbName, String tableName, + List part_vals, String validWriteIds) throws NoSuchObjectException, MetaException { + incrementCall(); + return super.getPartition(catName, dbName, tableName, part_vals, validWriteIds); + } + + @Override public List getPartitions(String catName, String dbName, String tableName, + int maxParts) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitions(catName, dbName, tableName, maxParts); + } + + @Override public Map getPartitionLocations(String catName, String dbName, + String tblName, String baseLocationToNotShow, int max) { + incrementCall(); + return super.getPartitionLocations(catName, dbName, tblName, baseLocationToNotShow, max); + } + + @Override public List getPartitionsWithAuth(String catName, String dbName, + String tblName, short max, String userName, List groupNames) + throws MetaException, InvalidObjectException { + incrementCall(); + return super.getPartitionsWithAuth(catName, dbName, tblName, max, userName, groupNames); + } + + @Override public Partition getPartitionWithAuth(String catName, String dbName, String tblName, + List partVals, String user_name, List group_names) + throws NoSuchObjectException, MetaException, InvalidObjectException { + incrementCall(); + return super.getPartitionWithAuth(catName, dbName, tblName, partVals, user_name, group_names); + } + + @Override public List listPartitionNames(String catName, String dbName, String tableName, + short max) throws MetaException { + incrementCall(); + return super.listPartitionNames(catName, dbName, tableName, max); + } + + @Override public PartitionValuesResponse listPartitionValues(String catName, String dbName, + String tableName, List cols, boolean applyDistinct, String filter, + boolean ascending, List order, long maxParts) throws MetaException { + incrementCall(); + return super + .listPartitionValues(catName, dbName, tableName, cols, applyDistinct, filter, ascending, + order, maxParts); + } + + @Override public List listPartitionsPsWithAuth(String catName, String db_name, + String tbl_name, List part_vals, short max_parts, String userName, + List groupNames) throws MetaException, InvalidObjectException, NoSuchObjectException { + incrementCall(); + return super + .listPartitionsPsWithAuth(catName, db_name, tbl_name, part_vals, max_parts, userName, + groupNames); + } + + @Override public List listPartitionNamesPs(String catName, String dbName, + String tableName, List part_vals, short max_parts) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.listPartitionNamesPs(catName, dbName, tableName, part_vals, max_parts); + } + + @Override public List getPartitionsByNames(String catName, String dbName, + String tblName, List partNames) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionsByNames(catName, dbName, tblName, partNames); + } + + @Override public boolean getPartitionsByExpr(String catName, String dbName, String tblName, + byte[] expr, String defaultPartitionName, short maxParts, List result) + throws TException { + incrementCall(); + return super.getPartitionsByExpr(catName, dbName, tblName, expr, defaultPartitionName, maxParts, + result); + } + + @Override protected boolean getPartitionsByExprInternal(String catName, String dbName, + String tblName, byte[] expr, String defaultPartitionName, short maxParts, + List result, boolean allowSql, boolean allowJdo) throws TException { + incrementCall(); + return super + .getPartitionsByExprInternal(catName, dbName, tblName, expr, defaultPartitionName, maxParts, + result, allowSql, allowJdo); + } + + @Override public List getPartitionsByFilter(String catName, String dbName, + String tblName, String filter, short maxParts) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionsByFilter(catName, dbName, tblName, filter, maxParts); + } + + @Override public int getNumPartitionsByFilter(String catName, String dbName, String tblName, + String filter) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getNumPartitionsByFilter(catName, dbName, tblName, filter); + } + + @Override public int getNumPartitionsByExpr(String catName, String dbName, String tblName, + byte[] expr) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getNumPartitionsByExpr(catName, dbName, tblName, expr); + } + + @Override protected List getPartitionsByFilterInternal(String catName, String dbName, + String tblName, String filter, short maxParts, boolean allowSql, boolean allowJdo) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionsByFilterInternal(catName, dbName, tblName, filter, maxParts, allowSql, + allowJdo); + } + + @Override public List getPartitionSpecsByFilterAndProjection(Table table, + GetPartitionsProjectionSpec partitionsProjectSpec, GetPartitionsFilterSpec filterSpec) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionSpecsByFilterAndProjection(table, partitionsProjectSpec, filterSpec); + } + + @Override public List listTableNamesByFilter(String catName, String dbName, String filter, + short maxTables) throws MetaException { + incrementCall(); + return super.listTableNamesByFilter(catName, dbName, filter, maxTables); + } + + @Override public List getTableColumnStatistics(String catName, String dbName, + String tableName, List colNames) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getTableColumnStatistics(catName, dbName, tableName, colNames); + } + + @Override public ColumnStatistics getTableColumnStatistics(String catName, String dbName, + String tableName, List colNames, String engine) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getTableColumnStatistics(catName, dbName, tableName, colNames, engine); + } + + @Override public ColumnStatistics getTableColumnStatistics(String catName, String dbName, + String tableName, List colNames, String engine, String writeIdList) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super + .getTableColumnStatistics(catName, dbName, tableName, colNames, engine, writeIdList); + } + + @Override public List> getPartitionColumnStatistics(String catName, + String dbName, String tableName, List partNames, List colNames) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionColumnStatistics(catName, dbName, tableName, partNames, colNames); + } + + @Override public List getPartitionColumnStatistics(String catName, + String dbName, String tableName, List partNames, List colNames, String engine) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super + .getPartitionColumnStatistics(catName, dbName, tableName, partNames, colNames, engine); + } + + @Override public List getPartitionColumnStatistics(String catName, + String dbName, String tableName, List partNames, List colNames, String engine, + String writeIdList) throws MetaException, NoSuchObjectException { + incrementCall(); + return super + .getPartitionColumnStatistics(catName, dbName, tableName, partNames, colNames, engine, + writeIdList); + } + + @Override public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName, + List partNames, List colNames, String engine, String writeIdList) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super + .get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, engine, writeIdList); + } + + @Override public List getPartitionColStatsForDatabase( + String catName, String dbName) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionColStatsForDatabase(catName, dbName); + } + + @Override public List getPrimaryKeys(String catName, String db_name, + String tbl_name) throws MetaException { + incrementCall(); + return super.getPrimaryKeys(catName, db_name, tbl_name); + } + + @Override public List getForeignKeys(String catName, String parent_db_name, + String parent_tbl_name, String foreign_db_name, String foreign_tbl_name) + throws MetaException { + incrementCall(); + return super.getForeignKeys(catName, parent_db_name, parent_tbl_name, foreign_db_name, + foreign_tbl_name); + } + + @Override public List getUniqueConstraints(String catName, String db_name, + String tbl_name) throws MetaException { + incrementCall(); + return super.getUniqueConstraints(catName, db_name, tbl_name); + } + + @Override public List getNotNullConstraints(String catName, String db_name, + String tbl_name) throws MetaException { + incrementCall(); + return super.getNotNullConstraints(catName, db_name, tbl_name); + } + + @Override public List getDefaultConstraints(String catName, String db_name, + String tbl_name) throws MetaException { + incrementCall(); + return super.getDefaultConstraints(catName, db_name, tbl_name); + } + + @Override public List getCheckConstraints(String catName, String db_name, + String tbl_name) throws MetaException { + incrementCall(); + return super.getCheckConstraints(catName, db_name, tbl_name); + } + + @Override public List getRuntimeStats(int maxEntries, int maxCreateTime) + throws MetaException { + incrementCall(); + return super.getRuntimeStats(maxEntries, maxCreateTime); + } +} diff --git a/standalone-metastore/DEV-README b/standalone-metastore/DEV-README index 84ed9383bb..ab5df26590 100644 --- a/standalone-metastore/DEV-README +++ b/standalone-metastore/DEV-README @@ -51,8 +51,6 @@ Supported databases for testing: -Dit.test=ITestPostgres -Dit.test=ITestSqlServer -By adding -Dverbose.schematool the Schema Tool output becomes more detailed. - You can download the Oracle driver at http://www.oracle.com/technetwork/database/features/jdbc/index-091264.html You should download Oracle 11g Release 1, ojdbc6.jar diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 0f94e13148..bae23f773e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -75,20 +75,20 @@ public CompactionTxnHandler() { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); // Check for completed transactions - String s = "SELECT DISTINCT \"TC\".\"CTC_DATABASE\", \"TC\".\"CTC_TABLE\", \"TC\".\"CTC_PARTITION\" " + - "FROM \"COMPLETED_TXN_COMPONENTS\" TC " + (checkInterval > 0 ? - "LEFT JOIN ( " + - " SELECT \"C1\".* FROM \"COMPLETED_COMPACTIONS\" \"C1\" " + - " INNER JOIN ( " + - " SELECT MAX(\"CC_ID\") \"CC_ID\" FROM \"COMPLETED_COMPACTIONS\" " + - " GROUP BY \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\"" + - " ) \"C2\" " + - " ON \"C1\".\"CC_ID\" = \"C2\".\"CC_ID\" " + - " WHERE \"C1\".\"CC_STATE\" IN (" + quoteChar(ATTEMPTED_STATE) + "," + quoteChar(FAILED_STATE) + ")" + - ") \"C\" " + - "ON \"TC\".\"CTC_DATABASE\" = \"C\".\"CC_DATABASE\" AND \"TC\".\"CTC_TABLE\" = \"C\".\"CC_TABLE\" " + - " AND (\"TC\".\"CTC_PARTITION\" = \"C\".\"CC_PARTITION\" OR (\"TC\".\"CTC_PARTITION\" IS NULL AND \"C\".\"CC_PARTITION\" IS NULL)) " + - "WHERE \"C\".\"CC_ID\" IS NOT NULL OR " + isWithinCheckInterval("\"TC\".\"CTC_TIMESTAMP\"", checkInterval) : ""); + String s = "select distinct tc.ctc_database, tc.ctc_table, tc.ctc_partition " + + "from COMPLETED_TXN_COMPONENTS tc " + (checkInterval > 0 ? + "left join ( " + + " select c1.* from COMPLETED_COMPACTIONS c1 " + + " inner join ( " + + " select max(cc_id) cc_id from COMPLETED_COMPACTIONS " + + " group by cc_database, cc_table, cc_partition" + + " ) c2 " + + " on c1.cc_id = c2.cc_id " + + " where c1.cc_state IN (" + quoteChar(ATTEMPTED_STATE) + "," + quoteChar(FAILED_STATE) + ")" + + ") c " + + "on tc.ctc_database = c.cc_database and tc.ctc_table = c.cc_table " + + " and (tc.ctc_partition = c.cc_partition or (tc.ctc_partition is null and c.cc_partition is null)) " + + "where c.cc_id is not null or " + isWithinCheckInterval("tc.ctc_timestamp", checkInterval) : ""); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -102,11 +102,11 @@ public CompactionTxnHandler() { rs.close(); // Check for aborted txns - s = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + - "FROM \"TXNS\", \"TXN_COMPONENTS\" " + - "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' " + - "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + - "HAVING COUNT(*) > " + abortedThreshold; + s = "select tc_database, tc_table, tc_partition " + + "from TXNS, TXN_COMPONENTS " + + "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + + "group by tc_database, tc_table, tc_partition " + + "having count(*) > " + abortedThreshold; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -151,8 +151,8 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + - "\"CQ_TYPE\", \"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + INITIATED_STATE + "'"; + String s = "select cq_id, cq_database, cq_table, cq_partition, " + + "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -171,9 +171,9 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { info.properties = rs.getString(6); // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); - s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = '" + workerId + "', " + - "\"CQ_START\" = " + now + ", \"CQ_STATE\" = '" + WORKING_STATE + "' WHERE \"CQ_ID\" = " + info.id + - " AND \"CQ_STATE\"='" + INITIATED_STATE + "'"; + s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + + "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id + + " AND cq_state='" + INITIATED_STATE + "'"; LOG.debug("Going to execute update <" + s + ">"); int updCount = updStmt.executeUpdate(s); if(updCount == 1) { @@ -221,8 +221,8 @@ public void markCompacted(CompactionInfo info) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', " + - "\"CQ_WORKER_ID\" = NULL WHERE \"CQ_ID\" = " + info.id; + String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + + "cq_worker_id = null where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); int updCnt = stmt.executeUpdate(s); if (updCnt != 1) { @@ -265,8 +265,8 @@ public void markCompacted(CompactionInfo info) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " - + "\"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + String s = "select cq_id, cq_database, cq_table, cq_partition, " + + "cq_type, cq_run_as, cq_highest_write_id from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -333,7 +333,7 @@ public long findMinOpenTxnId() throws MetaException { * which deletes rows from MIN_HISTORY_LEVEL which can only allow minOpenTxn to move higher) */ private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLException { - String s = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""; + String s = "select ntxn_next from NEXT_TXN_ID"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -341,7 +341,7 @@ private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLExceptio "initialized, no record found in next_txn_id"); } long hwm = rs.getLong(1); - s = "SELECT MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\""; + s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); rs.next(); @@ -369,10 +369,9 @@ public void markCleaned(CompactionInfo info) throws MetaException { ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " - + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", " - + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\" " - + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, " + + "CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, " + + "CQ_HADOOP_JOB_ID, CQ_ERROR_MESSAGE from COMPACTION_QUEUE WHERE CQ_ID = ?"); pStmt.setLong(1, info.id); rs = pStmt.executeQuery(); if(rs.next()) { @@ -392,11 +391,10 @@ public void markCleaned(CompactionInfo info) throws MetaException { LOG.debug("Going to rollback"); dbConn.rollback(); } - pStmt = dbConn.prepareStatement("INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", \"CC_DATABASE\", " - + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", " - + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", " - + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\")" - + " VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?)"); + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, " + + "CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, " + + "CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID, CC_ERROR_MESSAGE) " + + "VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?)"); info.state = SUCCEEDED_STATE; CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn)); updCount = pStmt.executeUpdate(); @@ -404,13 +402,13 @@ public void markCleaned(CompactionInfo info) throws MetaException { // Remove entries from completed_txn_components as well, so we don't start looking there // again but only up to the highest write ID include in this compaction job. //highestWriteId will be NULL in upgrade scenarios - s = "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_DATABASE\" = ? AND " + - "\"CTC_TABLE\" = ?"; + s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " + + "ctc_table = ?"; if (info.partName != null) { - s += " AND \"CTC_PARTITION\" = ?"; + s += " and ctc_partition = ?"; } if(info.highestWriteId != 0) { - s += " AND \"CTC_WRITEID\" <= ?"; + s += " and ctc_writeid <= ?"; } pStmt = dbConn.prepareStatement(s); int paramCount = 1; @@ -433,10 +431,10 @@ public void markCleaned(CompactionInfo info) throws MetaException { * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted txns). * See {@link ql.txn.compactor.Cleaner.removeFiles()} */ - s = "SELECT DISTINCT \"TXN_ID\" FROM \"TXNS\", \"TXN_COMPONENTS\" WHERE \"TXN_ID\" = \"TC_TXNID\" " - + "AND \"TXN_STATE\" = '" + TXN_ABORTED + "' AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?"; - if (info.highestWriteId != 0) s += " AND \"TC_WRITEID\" <= ?"; - if (info.partName != null) s += " AND \"TC_PARTITION\" = ?"; + s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + + TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; + if (info.highestWriteId != 0) s += " and tc_writeid <= ?"; + if (info.partName != null) s += " and tc_partition = ?"; pStmt = dbConn.prepareStatement(s); paramCount = 1; @@ -466,18 +464,18 @@ public void markCleaned(CompactionInfo info) throws MetaException { StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - prefix.append("DELETE FROM \"TXN_COMPONENTS\" WHERE "); + prefix.append("delete from TXN_COMPONENTS where "); //because 1 txn may include different partitions/tables even in auto commit mode - suffix.append(" AND \"TC_DATABASE\" = ?"); - suffix.append(" AND \"TC_TABLE\" = ?"); + suffix.append(" and tc_database = ?"); + suffix.append(" and tc_table = ?"); if (info.partName != null) { - suffix.append(" AND \"TC_PARTITION\" = ?"); + suffix.append(" and tc_partition = ?"); } // Populate the complete query with provided prefix and suffix List counts = TxnUtils - .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "\"TC_TXNID\"", + .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "tc_txnid", true, false); int totalCount = 0; for (int i = 0; i < queries.size(); i++) { @@ -546,7 +544,7 @@ public void cleanTxnToWriteIdTable() throws MetaException { // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid). - String s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED); + String s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_ABORTED); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (rs.next()) { @@ -557,7 +555,7 @@ public void cleanTxnToWriteIdTable() throws MetaException { } // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table. - s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " + minUncommittedTxnId; + s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + minUncommittedTxnId; LOG.debug("Going to execute delete <" + s + ">"); int rc = stmt.executeUpdate(s); LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommittedTxnId); @@ -596,9 +594,9 @@ public void cleanEmptyAbortedTxns() throws MetaException { //after that, so READ COMMITTED is sufficient. dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " + - "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " + - "\"TXN_STATE\" = '" + TXN_ABORTED + "'"; + String s = "select txn_id from TXNS where " + + "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + + "txn_state = '" + TXN_ABORTED + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); List txnids = new ArrayList<>(); @@ -614,10 +612,10 @@ public void cleanEmptyAbortedTxns() throws MetaException { StringBuilder suffix = new StringBuilder(); // Delete from TXNS. - prefix.append("DELETE FROM \"TXNS\" WHERE "); + prefix.append("delete from TXNS where "); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", false, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); @@ -660,8 +658,8 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL, \"CQ_STATE\" = '" - + INITIATED_STATE+ "' WHERE \"CQ_STATE\" = '" + WORKING_STATE + "' AND \"CQ_WORKER_ID\" LIKE '" + String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" + + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" + hostname + "%'"; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died @@ -705,8 +703,8 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); long latestValidStart = getDbTime(dbConn) - timeout; stmt = dbConn.createStatement(); - String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL, \"CQ_STATE\" = '" - + INITIATED_STATE+ "' WHERE \"CQ_STATE\" = '" + WORKING_STATE + "' AND \"CQ_START\" < " + String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" + + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " + latestValidStart; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died @@ -802,9 +800,9 @@ public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String sqlText = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HIGHEST_WRITE_ID\" = " + - ci.highestWriteId + ", \"CQ_RUN_AS\" = " + quoteString(ci.runAs) + - " WHERE \"CQ_ID\" = " + ci.id; + String sqlText = "UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + + ci.highestWriteId + ", cq_run_as = " + quoteString(ci.runAs) + + " WHERE CQ_ID = " + ci.id; if(LOG.isDebugEnabled()) { LOG.debug("About to execute: " + sqlText); } @@ -820,13 +818,13 @@ public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws * a new write id (so as not to invalidate result set caches/materialized views) but * we need to set it to something to that markCleaned() only cleans TXN_COMPONENTS up to * the level to which aborted files/data has been cleaned.*/ - sqlText = "INSERT INTO \"TXN_COMPONENTS\"(" + - "\"TC_TXNID\", " + - "\"TC_DATABASE\", " + - "\"TC_TABLE\", " + - (ci.partName == null ? "" : "\"TC_PARTITION\", ") + - "\"TC_WRITEID\", " + - "\"TC_OPERATION_TYPE\")" + + sqlText = "insert into TXN_COMPONENTS(" + + "TC_TXNID, " + + "TC_DATABASE, " + + "TC_TABLE, " + + (ci.partName == null ? "" : "TC_PARTITION, ") + + "TC_WRITEID, " + + "TC_OPERATION_TYPE)" + " VALUES(" + compactionTxnId + "," + quoteString(ci.dbname) + "," + @@ -909,8 +907,8 @@ public void purgeCompactionHistory() throws MetaException { stmt = dbConn.createStatement(); /*cc_id is monotonically increasing so for any entity sorts in order of compaction history, thus this query groups by entity and withing group sorts most recent first*/ - rs = stmt.executeQuery("SELECT \"CC_ID\", \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\" " - + "FROM \"COMPLETED_COMPACTIONS\" ORDER BY \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_ID\" DESC"); + rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " + + "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc"); String lastCompactedEntity = null; /*In each group, walk from most recent and count occurences of each state type. Once you * have counted enough (for each state) to satisfy retention policy, delete all other @@ -936,15 +934,14 @@ public void purgeCompactionHistory() throws MetaException { StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - prefix.append("DELETE FROM \"COMPLETED_COMPACTIONS\" WHERE "); + prefix.append("delete from COMPLETED_COMPACTIONS where "); suffix.append(""); List questions = new ArrayList<>(deleteSet.size()); for (int i = 0; i < deleteSet.size(); i++) { questions.add("?"); } - List counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, - "\"CC_ID\"", false, false); + List counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "cc_id", false, false); int totalCount = 0; for (int i = 0; i < queries.size(); i++) { String query = queries.get(i); @@ -1006,11 +1003,11 @@ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - pStmt = dbConn.prepareStatement("SELECT \"CC_STATE\" FROM \"COMPLETED_COMPACTIONS\" WHERE " + - "\"CC_DATABASE\" = ? AND " + - "\"CC_TABLE\" = ? " + - (ci.partName != null ? "AND \"CC_PARTITION\" = ?" : "") + - " AND \"CC_STATE\" != " + quoteChar(ATTEMPTED_STATE) + " ORDER BY \"CC_ID\" DESC"); + pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " + + "CC_DATABASE = ? and " + + "CC_TABLE = ? " + + (ci.partName != null ? "and CC_PARTITION = ?" : "") + + " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc"); pStmt.setString(1, ci.dbname); pStmt.setString(2, ci.tableName); if (ci.partName != null) { @@ -1065,15 +1062,14 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " - + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", " - + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\" " - + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, " + + "CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, " + + "CQ_HADOOP_JOB_ID, CQ_ERROR_MESSAGE from COMPACTION_QUEUE WHERE CQ_ID = ?"); pStmt.setLong(1, ci.id); rs = pStmt.executeQuery(); if(rs.next()) { ci = CompactionInfo.loadFullFromCompactionQueue(rs); - String s = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"; + String s = "delete from COMPACTION_QUEUE where cq_id = ?"; pStmt = dbConn.prepareStatement(s); pStmt.setLong(1, ci.id); LOG.debug("Going to execute update <" + s + ">"); @@ -1102,10 +1098,9 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho close(rs, stmt, null); closeStmt(pStmt); - pStmt = dbConn.prepareStatement("INSERT INTO \"COMPLETED_COMPACTIONS\" " - + "(\"CC_ID\", \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", " - + "\"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", \"CC_START\", \"CC_END\", \"CC_RUN_AS\", " - + "\"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", \"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\") " + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, " + + "CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, " + + "CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID, CC_ERROR_MESSAGE) " + "VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?)"); if (errorMessage != null) { ci.errorMessage = errorMessage; @@ -1143,8 +1138,7 @@ public void setHadoopJobId(String hadoopJobId, long id) { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HADOOP_JOB_ID\" = " + quoteString(hadoopJobId) - + " WHERE \"CQ_ID\" = " + id; + String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id; LOG.debug("Going to execute <" + s + ">"); int updateCount = stmt.executeUpdate(s); LOG.debug("Going to commit"); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java index a6d22d19ef..3f82891ef6 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java @@ -61,10 +61,6 @@ private boolean verbose; - public DatabaseRule() { - verbose = System.getProperty("verbose.schematool") != null; - } - public DatabaseRule setVerbose(boolean verbose) { this.verbose = verbose; return this;