diff --git a/beeline/pom.xml b/beeline/pom.xml
index 19ec53eba6..0bf065d802 100644
--- a/beeline/pom.xml
+++ b/beeline/pom.xml
@@ -105,6 +105,12 @@
tests
test
+
+ org.apache.hive.hcatalog
+ hive-hcatalog-server-extensions
+ ${project.version}
+ test
+
org.apache.hive
hive-service
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index e6113942dd..358d63beb0 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -332,7 +332,7 @@ public PartitionFiles next() {
Partition p = partitionIter.next();
Iterator fileIterator;
//For transactional tables, the actual file copy will be done by acid write event during replay of commit txn.
- if (!TxnUtils.isTransactionalTable(t)) {
+ if (!TxnUtils.isTransactionalTable(t) && p.getSd() != null) {
List files = Lists.newArrayList(new FileIterator(p.getSd().getLocation()));
fileIterator = files.iterator();
} else {
@@ -759,7 +759,8 @@ public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumn
.buildUpdateTableColumnStatMessage(updateTableColumnStatEvent.getColStats(),
updateTableColumnStatEvent.getTableObj(),
updateTableColumnStatEvent.getTableParameters(),
- updateTableColumnStatEvent.getWriteId());
+ updateTableColumnStatEvent.getWriteId(),
+ updateTableColumnStatEvent.getWriteIds());
NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_TABLE_COLUMN_STAT.toString(),
msgEncoder.getSerializer().serialize(msg));
ColumnStatisticsDesc statDesc = updateTableColumnStatEvent.getColStats().getStatsDesc();
@@ -789,7 +790,8 @@ public void onUpdatePartitionColumnStat(UpdatePartitionColumnStatEvent updatePar
updatePartColStatEvent.getPartVals(),
updatePartColStatEvent.getPartParameters(),
updatePartColStatEvent.getTableObj(),
- updatePartColStatEvent.getWriteId());
+ updatePartColStatEvent.getWriteId(),
+ updatePartColStatEvent.getWriteIds());
NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_PARTITION_COLUMN_STAT.toString(),
msgEncoder.getSerializer().serialize(msg));
ColumnStatisticsDesc statDesc = updatePartColStatEvent.getPartColStats().getStatsDesc();
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java
index efafe0c641..afa17613fa 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java
@@ -208,7 +208,7 @@ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
Configuration conf = handler.getConf();
Table newTbl;
try {
- newTbl = handler.get_table_core(tbl.getCatName(), tbl.getDbName(), tbl.getTableName())
+ newTbl = handler.get_table_core(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), null)
.deepCopy();
newTbl.getParameters().put(
HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 0212e076cd..0e1df69656 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -263,11 +263,6 @@ public boolean dropTable(String catName, String dbName, String tableName)
}
}
- @Override
- public Table getTable(String catName, String dbName, String tableName) throws MetaException {
- return objectStore.getTable(catName, dbName, tableName);
- }
-
@Override
public Table getTable(String catName, String dbName, String tableName,
String writeIdList) throws MetaException {
@@ -280,12 +275,6 @@ public boolean addPartition(Partition part)
return objectStore.addPartition(part);
}
- @Override
- public Partition getPartition(String catName, String dbName, String tableName, List partVals)
- throws MetaException, NoSuchObjectException {
- return objectStore.getPartition(catName, dbName, tableName, partVals);
- }
-
@Override
public Partition getPartition(String catName, String dbName, String tableName,
List partVals, String writeIdList)
@@ -305,15 +294,15 @@ public boolean dropPartition(String catName, String dbName, String tableName, Li
}
@Override
- public List getPartitions(String catName, String dbName, String tableName, int max)
+ public List getPartitions(String catName, String dbName, String tableName, int max, String writeIdList)
throws MetaException, NoSuchObjectException {
- return objectStore.getPartitions(catName, dbName, tableName, max);
+ return objectStore.getPartitions(catName, dbName, tableName, max, writeIdList);
}
@Override
public Map getPartitionLocations(String catName, String dbName, String tblName,
- String baseLocationToNotShow, int max) {
- return objectStore.getPartitionLocations(catName, dbName, tblName, baseLocationToNotShow, max);
+ String baseLocationToNotShow, int max, String writeIdList) {
+ return objectStore.getPartitionLocations(catName, dbName, tblName, baseLocationToNotShow, max, writeIdList);
}
@Override
@@ -378,9 +367,9 @@ public Table alterTable(String catName, String dbName, String name, Table newTab
}
@Override
- public List listPartitionNames(String catName, String dbName, String tblName, short maxParts)
+ public List listPartitionNames(String catName, String dbName, String tblName, short maxParts, String writeIdList)
throws MetaException {
- return objectStore.listPartitionNames(catName, dbName, tblName, maxParts);
+ return objectStore.listPartitionNames(catName, dbName, tblName, maxParts, writeIdList);
}
@Override
@@ -388,7 +377,7 @@ public PartitionValuesResponse listPartitionValues(String catName, String db_nam
String tbl_name, List cols,
boolean applyDistinct, String filter,
boolean ascending, List order,
- long maxParts) throws MetaException {
+ long maxParts, String writeIdList) throws MetaException {
return null;
}
@@ -416,42 +405,43 @@ public Partition alterPartition(String catName, String dbName, String tblName, L
@Override
public List getPartitionsByFilter(String catName, String dbName, String tblName,
- String filter, short maxParts) throws MetaException, NoSuchObjectException {
- return objectStore.getPartitionsByFilter(catName, dbName, tblName, filter, maxParts);
+ String filter, short maxParts, String writeIdList) throws MetaException, NoSuchObjectException {
+ return objectStore.getPartitionsByFilter(catName, dbName, tblName, filter, maxParts, writeIdList);
}
@Override
public List getPartitionSpecsByFilterAndProjection(Table table,
- GetPartitionsProjectionSpec projectionSpec, GetPartitionsFilterSpec filterSpec)
+ GetPartitionsProjectionSpec projectionSpec, GetPartitionsFilterSpec filterSpec, String writeIdList)
throws MetaException, NoSuchObjectException {
- return objectStore.getPartitionSpecsByFilterAndProjection(table, projectionSpec, filterSpec);
+ return objectStore.getPartitionSpecsByFilterAndProjection(table, projectionSpec, filterSpec, writeIdList);
}
@Override
public int getNumPartitionsByFilter(String catName, String dbName, String tblName,
- String filter) throws MetaException, NoSuchObjectException {
- return objectStore.getNumPartitionsByFilter(catName, dbName, tblName, filter);
+ String filter, String writeIdList) throws MetaException, NoSuchObjectException {
+ return objectStore.getNumPartitionsByFilter(catName, dbName, tblName, filter, writeIdList);
}
@Override
public int getNumPartitionsByExpr(String catName, String dbName, String tblName,
- byte[] expr) throws MetaException, NoSuchObjectException {
- return objectStore.getNumPartitionsByExpr(catName, dbName, tblName, expr);
+ byte[] expr, String writeIdList) throws MetaException, NoSuchObjectException {
+ return objectStore.getNumPartitionsByExpr(catName, dbName, tblName, expr, writeIdList);
}
@Override
public List getPartitionsByNames(String catName, String dbName, String tblName,
- List partNames)
+ List partNames, String writeIdList)
throws MetaException, NoSuchObjectException {
return objectStore.getPartitionsByNames(
- catName, dbName, tblName, partNames);
+ catName, dbName, tblName, partNames, writeIdList);
}
@Override
public boolean getPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr,
- String defaultPartitionName, short maxParts, List result) throws TException {
+ String defaultPartitionName, short maxParts, List result,
+ String writeIdList) throws TException {
return objectStore.getPartitionsByExpr(catName,
- dbName, tblName, expr, defaultPartitionName, maxParts, result);
+ dbName, tblName, expr, defaultPartitionName, maxParts, result, writeIdList);
}
@Override
@@ -622,34 +612,36 @@ public Role getRole(String roleName) throws NoSuchObjectException {
@Override
public Partition getPartitionWithAuth(String catName, String dbName, String tblName,
- List partVals, String userName, List groupNames)
+ List partVals, String userName,
+ List groupNames, String writeIdList)
throws MetaException, NoSuchObjectException, InvalidObjectException {
return objectStore.getPartitionWithAuth(catName, dbName, tblName, partVals, userName,
- groupNames);
+ groupNames, writeIdList);
}
@Override
public List getPartitionsWithAuth(String catName, String dbName, String tblName,
- short maxParts, String userName, List groupNames)
+ short maxParts, String userName,
+ List groupNames, String writeIdList)
throws MetaException, NoSuchObjectException, InvalidObjectException {
return objectStore.getPartitionsWithAuth(catName, dbName, tblName, maxParts, userName,
- groupNames);
+ groupNames, writeIdList);
}
@Override
public List listPartitionNamesPs(String catName, String dbName, String tblName,
- List partVals, short maxParts)
+ List partVals, short maxParts, String writeIdList)
throws MetaException, NoSuchObjectException {
- return objectStore.listPartitionNamesPs(catName, dbName, tblName, partVals, maxParts);
+ return objectStore.listPartitionNamesPs(catName, dbName, tblName, partVals, maxParts, writeIdList);
}
@Override
public List listPartitionsPsWithAuth(String catName, String dbName, String tblName,
List partVals, short maxParts, String userName,
- List groupNames)
+ List groupNames, String writeIdList)
throws MetaException, InvalidObjectException, NoSuchObjectException {
return objectStore.listPartitionsPsWithAuth(catName, dbName, tblName, partVals, maxParts,
- userName, groupNames);
+ userName, groupNames, writeIdList);
}
@Override
@@ -720,12 +712,6 @@ public long cleanupEvents() {
return objectStore.listTableColumnGrantsAll(catName, dbName, tableName, columnName);
}
- @Override
- public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tableName,
- List colNames) throws MetaException, NoSuchObjectException {
- return objectStore.getTableColumnStatistics(catName, dbName, tableName, colNames);
- }
-
@Override
public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tableName,
List colNames,
@@ -817,14 +803,6 @@ public void setMetaStoreSchemaVersion(String schemaVersion, String comment) thro
}
- @Override
- public List getPartitionColumnStatistics(String catName, String dbName,
- String tblName, List colNames,
- List partNames)
- throws MetaException, NoSuchObjectException {
- return objectStore.getPartitionColumnStatistics(catName, dbName, tblName , colNames, partNames);
- }
-
@Override
public List getPartitionColumnStatistics(String catName, String dbName,
String tblName, List colNames,
@@ -837,9 +815,9 @@ public void setMetaStoreSchemaVersion(String schemaVersion, String comment) thro
@Override
public boolean doesPartitionExist(String catName, String dbName, String tableName,
- List partKeys, List partVals)
+ List partKeys, List partVals, String writeIdList)
throws MetaException, NoSuchObjectException {
- return objectStore.doesPartitionExist(catName, dbName, tableName, partKeys, partVals);
+ return objectStore.doesPartitionExist(catName, dbName, tableName, partKeys, partVals, writeIdList);
}
@Override
@@ -905,13 +883,6 @@ public Function getFunction(String catName, String dbName, String funcName)
return objectStore.getFunctions(catName, dbName, pattern);
}
- @Override
- public AggrStats get_aggr_stats_for(String catName, String dbName,
- String tblName, List partNames, List colNames)
- throws MetaException {
- return null;
- }
-
@Override
public AggrStats get_aggr_stats_for(String catName, String dbName,
String tblName, List partNames, List colNames,
@@ -1324,5 +1295,4 @@ public int deleteRuntimeStats(int maxRetainSecs) throws MetaException {
NoSuchObjectException {
return null;
}
-
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
index 285f30b008..74fc40232d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
@@ -10,6 +10,7 @@
import org.apache.hadoop.hive.metastore.*;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.cache.CachedStore.MergedColumnStatsForPartitions;
import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -44,7 +45,6 @@ public void setUp() throws Exception {
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
MetastoreConf.setVar(conf, ConfVars.TRANSACTIONAL_EVENT_LISTENERS, DbNotificationListener.class.getName());
MetastoreConf.setVar(conf, ConfVars.RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.cache.CachedStore");
- MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT, true);
MetastoreConf.setBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED, true);
MetastoreConf.setBoolVar(conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED, false);
MetaStoreTestUtils.setConfForStandloneMode(conf);
@@ -120,84 +120,6 @@ private void comparePartitions(Partition part1, Partition part2) {
Assert.assertEquals(part1.getLastAccessTime(), part2.getLastAccessTime());
}
- @Test
- public void testDatabaseOpsForUpdateUsingEvents() throws Exception {
- RawStore rawStore = hmsHandler.getMS();
-
- // Prewarm CachedStore
- CachedStore.setCachePrewarmedState(false);
- CachedStore.prewarm(rawStore);
-
- // Add a db via rawStore
- String dbName = "testDatabaseOps";
- String dbOwner = "user1";
- Database db = createTestDb(dbName, dbOwner);
-
- hmsHandler.create_database(db);
- db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName);
-
- // Read database via CachedStore
- Database dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName);
- Assert.assertEquals(db, dbRead);
-
- // Add another db via rawStore
- final String dbName1 = "testDatabaseOps1";
- Database db1 = createTestDb(dbName1, dbOwner);
- hmsHandler.create_database(db1);
- db1 = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
-
- // Read database via CachedStore
- dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName1);
- Assert.assertEquals(db1, dbRead);
-
- // Alter the db via rawStore (can only alter owner or parameters)
- dbOwner = "user2";
- Database newdb = new Database(db);
- newdb.setOwnerName(dbOwner);
- hmsHandler.alter_database(dbName, newdb);
- newdb = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName);
-
- // Read db via cachedStore
- dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName);
- Assert.assertEquals(newdb, dbRead);
-
- // Add another db via rawStore
- final String dbName2 = "testDatabaseOps2";
- Database db2 = createTestDb(dbName2, dbOwner);
- hmsHandler.create_database(db2);
- db2 = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName2);
-
- // Alter db "testDatabaseOps" via rawStore
- dbOwner = "user1";
- newdb = new Database(db);
- newdb.setOwnerName(dbOwner);
- hmsHandler.alter_database(dbName, newdb);
- newdb = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName);
-
- // Drop db "testDatabaseOps1" via rawStore
- Database dropDb = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
- hmsHandler.drop_database(dbName1, true, true);
-
- // Read the newly added db via CachedStore
- dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName2);
- Assert.assertEquals(db2, dbRead);
-
- // Read the altered db via CachedStore (altered user from "user2" to "user1")
- dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName);
- Assert.assertEquals(newdb, dbRead);
-
- // Try to read the dropped db after cache update
- dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName1);
- Assert.assertEquals(null, dbRead);
-
- // Clean up
- hmsHandler.drop_database(dbName, true, true);
- hmsHandler.drop_database(dbName2, true, true);
- sharedCache.getDatabaseCache().clear();
- sharedCache.clearTableCache();
- sharedCache.getSdCache().clear();
- }
-
@Test
public void testTableOpsForUpdateUsingEvents() throws Exception {
long lastEventId = -1;
@@ -205,7 +127,7 @@ public void testTableOpsForUpdateUsingEvents() throws Exception {
// Prewarm CachedStore
CachedStore.setCachePrewarmedState(false);
- CachedStore.prewarm(rawStore);
+ CachedStore.prewarm(rawStore, conf);
// Add a db via rawStore
String dbName = "test_table_ops";
@@ -225,19 +147,17 @@ public void testTableOpsForUpdateUsingEvents() throws Exception {
List ptnCols = new ArrayList();
Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols);
hmsHandler.create_table(tbl);
- tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
+ tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName, null);
- // Read database, table via CachedStore
- Database dbRead= sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName);
- Assert.assertEquals(db, dbRead);
- Table tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName);
+ // Read table via CachedStore
+ Table tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, null);
compareTables(tblRead, tbl);
// Add a new table via rawStore
String tblName2 = "tbl2";
Table tbl2 = createTestTbl(dbName, tblName2, tblOwner, cols, ptnCols);
hmsHandler.create_table(tbl2);
- tbl2 = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName2);
+ tbl2 = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName2, null);
// Alter table "tbl" via rawStore
tblOwner = "role1";
@@ -245,7 +165,7 @@ public void testTableOpsForUpdateUsingEvents() throws Exception {
newTable.setOwner(tblOwner);
newTable.setOwnerType(PrincipalType.ROLE);
hmsHandler.alter_table(dbName, tblName, newTable);
- newTable = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
+ newTable = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName, null);
Assert.assertEquals("Owner of the table did not change.", tblOwner, newTable.getOwner());
Assert.assertEquals("Owner type of the table did not change", PrincipalType.ROLE, newTable.getOwnerType());
@@ -254,23 +174,22 @@ public void testTableOpsForUpdateUsingEvents() throws Exception {
hmsHandler.drop_table(dbName, tblName2, true);
// Read the altered "tbl" via CachedStore
- tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName);
+ tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, null);
compareTables(tblRead, newTable);
// Try to read the dropped "tbl2" via CachedStore (should throw exception)
- tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName2);
+ tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName2, null);
Assert.assertNull(tblRead);
// Clean up
hmsHandler.drop_database(dbName, true, true);
- tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName2);
+ tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName2, null);
Assert.assertNull(tblRead);
- tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName);
+ tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, null);
Assert.assertNull(tblRead);
- sharedCache.getDatabaseCache().clear();
sharedCache.clearTableCache();
sharedCache.getSdCache().clear();
}
@@ -282,7 +201,7 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {
// Prewarm CachedStore
CachedStore.setCachePrewarmedState(false);
- CachedStore.prewarm(rawStore);
+ CachedStore.prewarm(rawStore, conf);
// Add a db via rawStore
String dbName = "test_partition_ops";
@@ -304,7 +223,7 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {
ptnCols.add(ptnCol1);
Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols);
hmsHandler.create_table(tbl);
- tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
+ tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName, null);
final String ptnColVal1 = "aaa";
Map partParams = new HashMap();
@@ -313,7 +232,7 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {
0, tbl.getSd(), partParams);
ptn1.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.add_partition(ptn1);
- ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1));
+ ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1), null);
final String ptnColVal2 = "bbb";
Partition ptn2 =
@@ -321,13 +240,10 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {
0, tbl.getSd(), partParams);
ptn2.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.add_partition(ptn2);
- ptn2 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2));
+ ptn2 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2), null);
- // Read database, table, partition via CachedStore
- Database dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase());
- Assert.assertEquals(db, dbRead);
Table tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
- dbName.toLowerCase(), tblName.toLowerCase());
+ dbName.toLowerCase(), tblName.toLowerCase(), null);
compareTables(tbl, tblRead);
Partition ptn1Read = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal1));
@@ -343,20 +259,20 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {
0, tbl.getSd(), partParams);
ptn3.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.add_partition(ptn3);
- ptn3 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3));
+ ptn3 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3), null);
// Alter an existing partition ("aaa") via rawStore
- ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1));
+ ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1), null);
final String ptnColVal1Alt = "aaa";
Partition ptn1Atl =
new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0,
0, tbl.getSd(), partParams);
ptn1Atl.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.alter_partitions(dbName, tblName, Arrays.asList(ptn1Atl));
- ptn1Atl = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt));
+ ptn1Atl = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt), null);
// Drop an existing partition ("bbb") via rawStore
- Partition ptnDrop = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2));
+ Partition ptnDrop = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2), null);
hmsHandler.drop_partition(dbName, tblName, Arrays.asList(ptnColVal2), false);
// Read the newly added partition via CachedStore
@@ -382,13 +298,12 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {
// Clean up
rawStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName);
- sharedCache.getDatabaseCache().clear();
sharedCache.clearTableCache();
sharedCache.getSdCache().clear();
}
- private void updateTableColStats(String dbName, String tblName, String[] colName,
- double highValue, double avgColLen, boolean isTxnTable) throws Throwable {
+ private long updateTableColStats(String dbName, String tblName, String[] colName,
+ double highValue, double avgColLen, boolean isTxnTable, long lastEventId) throws Throwable {
long writeId = -1;
String validWriteIds = null;
if (isTxnTable) {
@@ -412,6 +327,7 @@ private void updateTableColStats(String dbName, String tblName, String[] colName
// write stats objs persistently
hmsHandler.update_table_column_statistics_req(setTblColStat);
+ lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId, null);
validateTablePara(dbName, tblName);
ColumnStatistics colStatsCache = sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
@@ -423,10 +339,11 @@ private void updateTableColStats(String dbName, String tblName, String[] colName
dbName, tblName, Lists.newArrayList(colName[1]), validWriteIds, true);
Assert.assertEquals(colStatsCache.getStatsObj().get(0).getColName(), colName[1]);
verifyStatString(colStatsCache.getStatsObj().get(0), colName[1], avgColLen);
+ return lastEventId;
}
- private void updatePartColStats(String dbName, String tblName, boolean isTxnTable, String[] colName,
- String partName, double highValue, double avgColLen) throws Throwable {
+ private long updatePartColStats(String dbName, String tblName, boolean isTxnTable, String[] colName,
+ String partName, double highValue, double avgColLen, long lastEventId) throws Throwable {
long writeId = -1;
String validWriteIds = null;
List txnIds = null;
@@ -471,7 +388,7 @@ private void updatePartColStats(String dbName, String tblName, boolean isTxnTabl
} else {
Assert.assertEquals(statRowStore.get(0).isIsStatsCompliant(), false);
}
-
+ lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId, conf);
List statSharedCache = sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
dbName, tblName, Collections.singletonList(partName), Collections.singletonList(colName[1]),
validWriteIds, true);
@@ -489,6 +406,8 @@ private void updatePartColStats(String dbName, String tblName, boolean isTxnTabl
statPartCache = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
CachedStore.partNameToVals(partName), colName[1], validWriteIds);
verifyStatString(statPartCache.getColumnStatisticsObj(), colName[1], avgColLen);
+
+ return lastEventId;
}
private List getStatsObjects(String dbName, String tblName, String[] colName,
@@ -572,7 +491,7 @@ private void setUpBeforeTest(String dbName, String tblName, String[] colName, bo
// Prewarm CachedStore
CachedStore.setCachePrewarmedState(false);
- CachedStore.prewarm(rawStore);
+ CachedStore.prewarm(rawStore, conf);
// Add a db via rawStore
Database db = createTestDb(dbName, dbOwner);
@@ -670,8 +589,8 @@ private String getValidWriteIds(String dbName, String tblName) throws Throwable
}
private void validateTablePara(String dbName, String tblName) throws Throwable {
- Table tblRead = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
- Table tblRead1 = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName);
+ Table tblRead = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName, null);
+ Table tblRead1 = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, null);
Assert.assertEquals(tblRead.getParameters(), tblRead1.getParameters());
}
@@ -706,18 +625,19 @@ private void testTableColStatInternal(String dbName, String tblName, boolean isT
String[] colName = new String[]{"income", "name"};
double highValue = 1200000.4525;
double avgColLen = 50.30;
+ long lastEventId = 0;
setUpBeforeTest(dbName, tblName, colName, isTxnTable);
- updateTableColStats(dbName, tblName, colName, highValue, avgColLen, isTxnTable);
+ lastEventId = updateTableColStats(dbName, tblName, colName, highValue, avgColLen, isTxnTable, lastEventId);
if (!isTxnTable) {
deleteColStats(dbName, tblName, colName);
}
tblName = "tbl_part";
createTableWithPart(dbName, tblName, colName, isTxnTable);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1, null);
String partName = partitions.get(0);
- updatePartColStats(dbName, tblName, isTxnTable, colName, partName, highValue, avgColLen);
+ lastEventId = updatePartColStats(dbName, tblName, isTxnTable, colName, partName, highValue, avgColLen, lastEventId);
if (!isTxnTable) {
deletePartColStats(dbName, tblName, colName, partName);
}
@@ -747,11 +667,12 @@ public void testTableColumnStatisticsTxnTableMulti() throws Throwable {
setUpBeforeTest(dbName, null, colName, true);
createTableWithPart(dbName, tblName, colName, true);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1, null);
String partName = partitions.get(0);
- updatePartColStats(dbName, tblName, true, colName, partName, highValue, avgColLen);
- updatePartColStats(dbName, tblName, true, colName, partName, 1200000.4521, avgColLen);
- updatePartColStats(dbName, tblName, true, colName, partName, highValue, 34.78);
+ long lastEventId = 0;
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partName, highValue, avgColLen, lastEventId);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partName, 1200000.4521, avgColLen, lastEventId);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partName, highValue, 34.78, lastEventId);
}
@Test
@@ -761,10 +682,11 @@ public void testTableColumnStatisticsTxnTableMultiAbort() throws Throwable {
String[] colName = new String[]{"income", "name"};
double highValue = 1200000.4525;
double avgColLen = 50.30;
+ long lastEventId = 0;
setUpBeforeTest(dbName, null, colName, true);
createTableWithPart(dbName, tblName, colName, true);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1, null);
String partName = partitions.get(0);
List txnIds = allocateTxns(1);
@@ -804,6 +726,7 @@ public void testTableColumnStatisticsTxnTableMultiAbort() throws Throwable {
verifyStat(statRawStore.get(0).getStatsObj(), colName, highValue, avgColLen);
Assert.assertEquals(statRawStore.get(0).isIsStatsCompliant(), false);
+ lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId, conf);
List statsListFromCache = sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
dbName, tblName, Collections.singletonList(partName), Collections.singletonList(colName[1]),
validWriteIds, true);
@@ -824,14 +747,15 @@ public void testTableColumnStatisticsTxnTableOpenTxn() throws Throwable {
String[] colName = new String[]{"income", "name"};
double highValue = 1200000.4121;
double avgColLen = 23.30;
+ long lastEventId = 0;
setUpBeforeTest(dbName, null, colName, true);
createTableWithPart(dbName, tblName, colName, true);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1, null);
String partName = partitions.get(0);
// update part col stats successfully.
- updatePartColStats(dbName, tblName, true, colName, partName, 1.2, 12.2);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partName, 1.2, 12.2, lastEventId);
List txnIds = allocateTxns(1);
long writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId();
@@ -854,6 +778,7 @@ public void testTableColumnStatisticsTxnTableOpenTxn() throws Throwable {
// write stats objs persistently
hmsHandler.update_partition_column_statistics_req(setTblColStat);
+ lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId, conf);
// keep the txn open and verify that the stats got is not compliant.
@@ -904,9 +829,9 @@ private void verifyAggrStat(String dbName, String tblName, String[] colName, Lis
Assert.assertEquals(aggrStatsCached, aggrStats);
//Assert.assertEquals(aggrStatsCached.isIsStatsCompliant(), true);
- List stats = sharedCache.getAggrStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
- Collections.singletonList(colName[0]), SharedCache.StatsType.ALL);
- Assert.assertEquals(stats.get(0).getStatsData().getDoubleStats().getHighValue(), highValue, 0.01);
+ MergedColumnStatsForPartitions stats = CachedStore.mergeColStatsForPartitions(DEFAULT_CATALOG_NAME, dbName, tblName, Lists.newArrayList("income=1", "income=2"),
+ Collections.singletonList(colName[0]), sharedCache, SharedCache.StatsType.ALL, validWriteIds, false, 0.0);
+ Assert.assertEquals(stats.colStats.get(0).getStatsData().getDoubleStats().getHighValue(), highValue, 0.01);
}
@Test
@@ -917,15 +842,17 @@ public void testAggrStat() throws Throwable {
setUpBeforeTest(dbName, null, colName, false);
createTableWithPart(dbName, tblName, colName, false);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short) -1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short) -1, null);
String partName = partitions.get(0);
// update part col stats successfully.
- updatePartColStats(dbName, tblName, false, colName, partitions.get(0), 2, 12);
- updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 4, 10);
+ long lastEventId = 0;
+ lastEventId = updatePartColStats(dbName, tblName, false, colName, partitions.get(0), 2, 12, lastEventId);
+ lastEventId = updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 4, 10, lastEventId);
+ lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId, conf);
verifyAggrStat(dbName, tblName, colName, partitions, false, 4);
- updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 3, 10);
+ lastEventId = updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 3, 10, lastEventId);
verifyAggrStat(dbName, tblName, colName, partitions, false, 3);
}
@@ -934,18 +861,19 @@ public void testAggrStatTxnTable() throws Throwable {
String dbName = "aggr_stats_test_db_txn";
String tblName = "tbl_part";
String[] colName = new String[]{"income", "name"};
+ long lastEventId = 0;
setUpBeforeTest(dbName, null, colName, true);
createTableWithPart(dbName, tblName, colName, true);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1, null);
String partName = partitions.get(0);
// update part col stats successfully.
- updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12);
- updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12, lastEventId);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10, lastEventId);
verifyAggrStat(dbName, tblName, colName, partitions, true, 4);
- updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 3, 10);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 3, 10, lastEventId);
verifyAggrStat(dbName, tblName, colName, partitions, true, 3);
List txnIds = allocateTxns(1);
@@ -988,15 +916,16 @@ public void testAggrStatAbortTxn() throws Throwable {
String dbName = "aggr_stats_test_db_txn_abort";
String tblName = "tbl_part";
String[] colName = new String[]{"income", "name"};
+ long lastEventId = 0;
setUpBeforeTest(dbName, null, colName, true);
createTableWithPart(dbName, tblName, colName, true);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1, null);
String partName = partitions.get(0);
// update part col stats successfully.
- updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12);
- updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12, lastEventId);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10, lastEventId);
verifyAggrStat(dbName, tblName, colName, partitions, true, 4);
List txnIds = allocateTxns(4);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 61be5a3a5b..6ab6574fc9 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1699,6 +1699,9 @@ public boolean isWriteIdAborted(long writeid) {
public RangeResponse isWriteIdRangeAborted(long minWriteId, long maxWriteId) {
return RangeResponse.ALL;
}
+
+ @Override
+ public void commitWriteId(long writeId) {};
};
OrcInputFormat aif = new OrcInputFormat();
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index d2c2ccd5ea..017db09c10 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -157,8 +157,9 @@ public void initConf() throws Exception {
}
// Plug verifying metastore in for testing DirectSQL.
- conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.VerifyingObjectStore");
-
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.cache.CachedStore");
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS, "org.apache.hive.hcatalog.listener.DbNotificationListener");
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
miniClusters.initConf(conf);
}
@@ -289,6 +290,7 @@ public void clearTablesCreatedDuringTests() throws Exception {
conf.set("hive.metastore.filter.hook", "org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl");
db = Hive.get(conf);
+ SessionState.get().initTxnMgr(conf);
// First delete any MVs to avoid race conditions
for (String dbName : db.getAllDatabases()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index ae622c8be5..cc86799a32 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1738,35 +1738,38 @@ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnMa
}
// If we've opened a transaction we need to commit or rollback rather than explicitly
// releasing the locks.
- conf.unset(ValidTxnList.VALID_TXNS_KEY);
- conf.unset(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
if(!checkConcurrency()) {
return;
}
- if (txnMgr.isTxnOpen()) {
- if (commit) {
- if(conf.getBoolVar(ConfVars.HIVE_IN_TEST) && conf.getBoolVar(ConfVars.HIVETESTMODEROLLBACKTXN)) {
+ try {
+ if (txnMgr.isTxnOpen()) {
+ if (commit) {
+ if(conf.getBoolVar(ConfVars.HIVE_IN_TEST) && conf.getBoolVar(ConfVars.HIVETESTMODEROLLBACKTXN)) {
+ txnMgr.rollbackTxn();
+ }
+ else {
+ txnMgr.commitTxn();//both commit & rollback clear ALL locks for this tx
+ }
+ } else {
txnMgr.rollbackTxn();
}
- else {
- txnMgr.commitTxn();//both commit & rollback clear ALL locks for this tx
- }
} else {
- txnMgr.rollbackTxn();
+ //since there is no tx, we only have locks for current query (if any)
+ if (ctx != null && ctx.getHiveLocks() != null) {
+ hiveLocks.addAll(ctx.getHiveLocks());
+ }
+ txnMgr.releaseLocks(hiveLocks);
}
- } else {
- //since there is no tx, we only have locks for current query (if any)
- if (ctx != null && ctx.getHiveLocks() != null) {
- hiveLocks.addAll(ctx.getHiveLocks());
+ } finally {
+ hiveLocks.clear();
+ if (ctx != null) {
+ ctx.setHiveLocks(null);
}
- txnMgr.releaseLocks(hiveLocks);
- }
- hiveLocks.clear();
- if (ctx != null) {
- ctx.setHiveLocks(null);
- }
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
+ conf.unset(ValidTxnList.VALID_TXNS_KEY);
+ conf.unset(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
+ }
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index 267f7d041f..1077421ac4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -21,6 +21,8 @@
import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.LineageState;
import org.apache.hadoop.hive.ql.session.SessionState;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 295fe7cbd0..be4d4dc334 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1969,6 +1969,42 @@ public static TableSnapshot getTableSnapshot(Configuration conf,
validWriteIdList != null ? validWriteIdList.toString() : null);
}
+ /**
+ * This is called by Hive.java for all write operations (DDL). Advance write id
+ * for the table via transaction manager, and store it in config. The write id
+ * will be marked as committed instantly in config, as all DDL are auto
+ * committed, there's no chance to rollback.
+ */
+ public static ValidWriteIdList advanceWriteId(HiveConf conf, Table tbl) throws LockException {
+ if (!isTransactionalTable(tbl)) {
+ return null;
+ }
+ HiveTxnManager txnMgr = SessionState.get().getTxnMgr();
+ long writeId = SessionState.get().getTxnMgr().getTableWriteId(tbl.getDbName(), tbl.getTableName());
+ List txnTables = new ArrayList<>();
+ String fullTableName = getFullTableName(tbl.getDbName(), tbl.getTableName());
+ txnTables.add(fullTableName);
+ ValidTxnWriteIdList txnWriteIds;
+ if (conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY) != null) {
+ txnWriteIds = new ValidTxnWriteIdList(conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
+ } else {
+ String txnString;
+ if (conf.get(ValidTxnList.VALID_TXNS_KEY) != null) {
+ txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ } else {
+ ValidTxnList txnIds = txnMgr.getValidTxns();
+ txnString = txnIds.toString();
+ }
+ txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString);
+ }
+ ValidWriteIdList writeIds = txnWriteIds.getTableValidWriteIdList(fullTableName);
+ if (writeIds != null) {
+ writeIds.commitWriteId(writeId);
+ conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, txnWriteIds.toString());
+ }
+ return writeIds;
+ }
+
/**
* Returns ValidWriteIdList for the table with the given "dbName" and "tableName".
* This is called when HiveConf has no list for the table.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index d412dd72d1..82fb21f2ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -680,7 +680,6 @@ private void stopHeartbeat() throws LockException {
@Override
public ValidTxnList getValidTxns() throws LockException {
- assert isTxnOpen();
init();
try {
return getMS().getValidTxns(txnId);
@@ -692,7 +691,6 @@ public ValidTxnList getValidTxns() throws LockException {
@Override
public ValidTxnWriteIdList getValidWriteIds(List tableList,
String validTxnList) throws LockException {
- assert isTxnOpen();
assert validTxnList != null && !validTxnList.isEmpty();
try {
return TxnCommonUtils.createValidTxnWriteIdList(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 2ae1db57aa..ed151eef92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -111,6 +111,7 @@
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc;
import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableDropPartitionDesc;
@@ -691,6 +692,7 @@ public void alterTable(String catName, String dbName, String tblName, Table newT
EnvironmentContext environmentContext, boolean transactional, long replWriteId)
throws HiveException {
+ boolean txnOpened = false;
if (catName == null) {
catName = getDefaultCatalog(conf);
}
@@ -721,6 +723,11 @@ public void alterTable(String catName, String dbName, String tblName, Table newT
replWriteId);
tableSnapshot = new TableSnapshot(replWriteId, writeIds.writeToString());
} else {
+ if (AcidUtils.isTransactionalTable(newTbl)) {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, newTbl);
+ }
// Make sure we pass in the names, so we can get the correct snapshot for rename table.
tableSnapshot = AcidUtils.getTableSnapshot(conf, newTbl, dbName, tblName, true);
}
@@ -739,6 +746,12 @@ public void alterTable(String catName, String dbName, String tblName, Table newT
throw new HiveException("Unable to alter table. " + e.getMessage(), e);
} catch (TException e) {
throw new HiveException("Unable to alter table. " + e.getMessage(), e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
@@ -790,6 +803,7 @@ public void alterPartition(String tblName, Partition newPart,
public void alterPartition(String catName, String dbName, String tblName, Partition newPart,
EnvironmentContext environmentContext, boolean transactional)
throws InvalidOperationException, HiveException {
+ boolean txnOpened = false;
try {
if (catName == null) {
catName = getDefaultCatalog(conf);
@@ -803,6 +817,13 @@ public void alterPartition(String catName, String dbName, String tblName, Partit
if (environmentContext == null) {
environmentContext = new EnvironmentContext();
}
+
+ if (AcidUtils.isTransactionalTable(newPart.getTable())) {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, newPart.getTable());
+ }
+
AcidUtils.TableSnapshot tableSnapshot = null;
if (transactional) {
tableSnapshot = AcidUtils.getTableSnapshot(conf, newPart.getTable(), true);
@@ -820,6 +841,12 @@ public void alterPartition(String catName, String dbName, String tblName, Partit
throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
} catch (TException e) {
throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
@@ -847,10 +874,16 @@ private void validatePartition(Partition newPart) throws HiveException {
public void alterPartitions(String tblName, List newParts,
EnvironmentContext environmentContext, boolean transactional)
throws InvalidOperationException, HiveException {
+ boolean txnOpened = false;
String[] names = Utilities.getDbTableName(tblName);
List newTParts =
new ArrayList();
try {
+ if (AcidUtils.isTransactionalTable(newParts.get(0).getTable())) {
+ // Advance writeId for ddl on transactional table
+ txnOpened = openTxnIfNeeded();
+ AcidUtils.advanceWriteId(conf, newParts.get(0).getTable());
+ }
AcidUtils.TableSnapshot tableSnapshot = null;
if (transactional) {
tableSnapshot = AcidUtils.getTableSnapshot(conf, newParts.get(0).getTable(), true);
@@ -874,6 +907,12 @@ public void alterPartitions(String tblName, List newParts,
throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
} catch (TException e) {
throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
/**
@@ -890,6 +929,7 @@ public void alterPartitions(String tblName, List newParts,
public void renamePartition(Table tbl, Map oldPartSpec, Partition newPart,
long replWriteId)
throws HiveException {
+ boolean txnOpened = false;
try {
Map newPartSpec = newPart.getSpec();
if (oldPartSpec.keySet().size() != tbl.getPartCols().size()
@@ -923,6 +963,11 @@ public void renamePartition(Table tbl, Map oldPartSpec, Partitio
tbl.getTableName()), new long[0], new BitSet(), replWriteId);
tableSnapshot = new TableSnapshot(replWriteId, writeIds.writeToString());
} else {
+ if (AcidUtils.isTransactionalTable(tbl)) {
+ AcidUtils.advanceWriteId(conf, tbl);
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, tbl);
+ }
// Set table snapshot to api.Table to make it persistent.
tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
}
@@ -942,6 +987,12 @@ public void renamePartition(Table tbl, Map oldPartSpec, Partitio
throw new HiveException("Unable to rename partition. " + e.getMessage(), e);
} catch (TException e) {
throw new HiveException("Unable to rename partition. " + e.getMessage(), e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
@@ -1001,6 +1052,7 @@ public void createTable(Table tbl, boolean ifNotExists,
List defaultConstraints,
List checkConstraints)
throws HiveException {
+ boolean txnOpened = false;
try {
if (tbl.getDbName() == null || "".equals(tbl.getDbName().trim())) {
tbl.setDbName(SessionState.get().getCurrentDatabase());
@@ -1025,6 +1077,11 @@ public void createTable(Table tbl, boolean ifNotExists,
tTbl.setPrivileges(principalPrivs);
}
}
+ if (AcidUtils.isTransactionalTable(tbl)) {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, tbl);
+ }
// Set table snapshot to api.Table to make it persistent. A transactional table being
// replicated may have a valid write Id copied from the source. Use that instead of
// crafting one on the replica.
@@ -1050,6 +1107,12 @@ public void createTable(Table tbl, boolean ifNotExists,
}
} catch (Exception e) {
throw new HiveException(e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
@@ -1144,7 +1207,18 @@ public void dropTable(String dbName, String tableName, boolean deleteData,
*/
public void dropTable(String dbName, String tableName, boolean deleteData,
boolean ignoreUnknownTab, boolean ifPurge) throws HiveException {
+ boolean txnOpened = false;
try {
+ Table tbl = null;
+ try {
+ tbl = getTable(dbName, tableName);
+ } catch (InvalidTableException e) {
+ }
+ if (tbl != null && AcidUtils.isTransactionalTable(tbl)) {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, tbl);
+ }
getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab, ifPurge);
} catch (NoSuchObjectException e) {
if (!ignoreUnknownTab) {
@@ -1159,6 +1233,12 @@ public void dropTable(String dbName, String tableName, boolean deleteData,
throw new HiveException(e);
} catch (Exception e) {
throw new HiveException(e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
@@ -1172,13 +1252,19 @@ public void dropTable(String dbName, String tableName, boolean deleteData,
* @throws HiveException
*/
public void truncateTable(String dbDotTableName, Map partSpec, Long writeId) throws HiveException {
+ boolean txnOpened = false;
try {
Table table = getTable(dbDotTableName, true);
+
AcidUtils.TableSnapshot snapshot = null;
if (AcidUtils.isTransactionalTable(table)) {
if (writeId <= 0) {
snapshot = AcidUtils.getTableSnapshot(conf, table, true);
} else {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, table);
+
String fullTableName = getFullTableName(table.getDbName(), table.getTableName());
ValidWriteIdList writeIdList = getMSC().getValidWriteIds(fullTableName, writeId);
snapshot = new TableSnapshot(writeId, writeIdList.writeToString());
@@ -1196,6 +1282,12 @@ public void truncateTable(String dbDotTableName, Map partSpec, L
}
} catch (Exception e) {
throw new HiveException(e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
@@ -1985,48 +2077,61 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par
boolean isSrcLocal, boolean isAcidIUDoperation,
boolean resetStatistics, Long writeId,
int stmtId, boolean isInsertOverwrite) throws HiveException {
+ boolean txnOpened = false;
+ try {
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION);
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION);
-
- // Get the partition object if it already exists
- Partition oldPart = getPartition(tbl, partSpec, false);
- boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
+ // Get the partition object if it already exists
+ Partition oldPart = getPartition(tbl, partSpec, false);
+ boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
- // If config is set, table is not temporary and partition being inserted exists, capture
- // the list of files added. For not yet existing partitions (insert overwrite to new partition
- // or dynamic partition inserts), the add partition event will capture the list of files added.
- List newFiles = Collections.synchronizedList(new ArrayList<>());
+ // If config is set, table is not temporary and partition being inserted exists, capture
+ // the list of files added. For not yet existing partitions (insert overwrite to new partition
+ // or dynamic partition inserts), the add partition event will capture the list of files added.
+ List newFiles = Collections.synchronizedList(new ArrayList<>());
- Partition newTPart = loadPartitionInternal(loadPath, tbl, partSpec, oldPart,
- loadFileType, inheritTableSpecs,
- inheritLocation, isSkewedStoreAsSubdir, isSrcLocal, isAcidIUDoperation,
- resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
+ Partition newTPart = loadPartitionInternal(loadPath, tbl, partSpec, oldPart,
+ loadFileType, inheritTableSpecs,
+ inheritLocation, isSkewedStoreAsSubdir, isSrcLocal, isAcidIUDoperation,
+ resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
- AcidUtils.TableSnapshot tableSnapshot = isTxnTable ? getTableSnapshot(tbl, writeId) : null;
- if (tableSnapshot != null) {
- newTPart.getTPartition().setWriteId(tableSnapshot.getWriteId());
- }
-
- if (oldPart == null) {
- addPartitionToMetastore(newTPart, resetStatistics, tbl, tableSnapshot);
- // For acid table, add the acid_write event with file list at the time of load itself. But
- // it should be done after partition is created.
- if (isTxnTable && (null != newFiles)) {
- addWriteNotificationLog(tbl, partSpec, newFiles, writeId);
+ if (AcidUtils.isTransactionalTable(tbl)) {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, tbl);
}
- } else {
- try {
- setStatsPropAndAlterPartition(resetStatistics, tbl, newTPart, tableSnapshot);
- } catch (TException e) {
- LOG.error(StringUtils.stringifyException(e));
- throw new HiveException(e);
+ AcidUtils.TableSnapshot tableSnapshot = isTxnTable ? getTableSnapshot(tbl, writeId) : null;
+ if (tableSnapshot != null) {
+ newTPart.getTPartition().setWriteId(tableSnapshot.getWriteId());
}
- }
- perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION);
+ if (oldPart == null) {
+ addPartitionToMetastore(newTPart, resetStatistics, tbl, tableSnapshot);
+ // For acid table, add the acid_write event with file list at the time of load itself. But
+ // it should be done after partition is created.
+ if (isTxnTable && (null != newFiles)) {
+ addWriteNotificationLog(tbl, partSpec, newFiles, writeId);
+ }
+ } else {
+ try {
+ setStatsPropAndAlterPartition(resetStatistics, tbl, newTPart, tableSnapshot);
+ } catch (TException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
+ }
+ }
+
+ perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION);
- return newTPart;
+ return newTPart;
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
+ }
}
/**
@@ -2634,206 +2739,219 @@ private void constructOneLBLocationMap(FileStatus fSta,
final int numDP, final int numLB, final boolean isAcid, final long writeId, final int stmtId,
final boolean resetStatistics, final AcidUtils.Operation operation,
boolean isInsertOverwrite) throws HiveException {
+ boolean txnOpened = false;
+ try {
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS);
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS);
-
- // Get all valid partition paths and existing partitions for them (if any)
- final Table tbl = getTable(tableName);
- final Set validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId,
- AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite);
-
- final int partsToLoad = validPartitions.size();
- final AtomicInteger partitionsLoaded = new AtomicInteger(0);
- final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
- && InPlaceUpdate.canRenderInPlace(conf) && !SessionState.getConsole().getIsSilent();
- final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null;
+ // Get all valid partition paths and existing partitions for them (if any)
+ final Table tbl = getTable(tableName);
+ final Set validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId,
+ AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite);
- final SessionState parentSession = SessionState.get();
- List> tasks = Lists.newLinkedList();
+ final int partsToLoad = validPartitions.size();
+ final AtomicInteger partitionsLoaded = new AtomicInteger(0);
+ final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
+ && InPlaceUpdate.canRenderInPlace(conf) && !SessionState.getConsole().getIsSilent();
+ final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null;
- final class PartitionDetails {
- Map fullSpec;
- Partition partition;
- List newFiles;
- boolean hasOldPartition = false;
- AcidUtils.TableSnapshot tableSnapshot;
- }
-
- Map partitionDetailsMap =
- Collections.synchronizedMap(new LinkedHashMap<>());
+ final SessionState parentSession = SessionState.get();
+ List> tasks = Lists.newLinkedList();
- // calculate full path spec for each valid partition path
- validPartitions.forEach(partPath -> {
- Map fullPartSpec = Maps.newLinkedHashMap(partSpec);
- if (!Warehouse.makeSpecFromName(fullPartSpec, partPath, new HashSet<>(partSpec.keySet()))) {
- Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath);
- } else {
- PartitionDetails details = new PartitionDetails();
- details.fullSpec = fullPartSpec;
- partitionDetailsMap.put(partPath, details);
+ final class PartitionDetails {
+ Map fullSpec;
+ Partition partition;
+ List newFiles;
+ boolean hasOldPartition = false;
+ AcidUtils.TableSnapshot tableSnapshot;
}
- });
- // fetch all the partitions matching the part spec using the partition iterable
- // this way the maximum batch size configuration parameter is considered
- PartitionIterable partitionIterable = new PartitionIterable(Hive.get(), tbl, partSpec,
- conf.getInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 300));
- Iterator iterator = partitionIterable.iterator();
+ Map partitionDetailsMap =
+ Collections.synchronizedMap(new LinkedHashMap<>());
- // Match valid partition path to partitions
- while (iterator.hasNext()) {
- Partition partition = iterator.next();
- partitionDetailsMap.entrySet().stream()
- .filter(entry -> entry.getValue().fullSpec.equals(partition.getSpec()))
- .findAny().ifPresent(entry -> {
- entry.getValue().partition = partition;
- entry.getValue().hasOldPartition = true;
- });
- }
+ // calculate full path spec for each valid partition path
+ validPartitions.forEach(partPath -> {
+ Map fullPartSpec = Maps.newLinkedHashMap(partSpec);
+ if (!Warehouse.makeSpecFromName(fullPartSpec, partPath, new HashSet<>(partSpec.keySet()))) {
+ Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath);
+ } else {
+ PartitionDetails details = new PartitionDetails();
+ details.fullSpec = fullPartSpec;
+ partitionDetailsMap.put(partPath, details);
+ }
+ });
- boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
- AcidUtils.TableSnapshot tableSnapshot = isTxnTable ? getTableSnapshot(tbl, writeId) : null;
+ // fetch all the partitions matching the part spec using the partition iterable
+ // this way the maximum batch size configuration parameter is considered
+ PartitionIterable partitionIterable = new PartitionIterable(Hive.get(), tbl, partSpec,
+ conf.getInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 300));
+ Iterator iterator = partitionIterable.iterator();
- for (Entry entry : partitionDetailsMap.entrySet()) {
- tasks.add(() -> {
- PartitionDetails partitionDetails = entry.getValue();
- Map fullPartSpec = partitionDetails.fullSpec;
- try {
+ // Match valid partition path to partitions
+ while (iterator.hasNext()) {
+ Partition partition = iterator.next();
+ partitionDetailsMap.entrySet().stream()
+ .filter(entry -> entry.getValue().fullSpec.equals(partition.getSpec()))
+ .findAny().ifPresent(entry -> {
+ entry.getValue().partition = partition;
+ entry.getValue().hasOldPartition = true;
+ });
+ }
- SessionState.setCurrentSessionState(parentSession);
- LOG.info("New loading path = " + entry.getKey() + " withPartSpec " + fullPartSpec);
+ boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
+ if (isTxnTable) {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, tbl);
+ }
+ AcidUtils.TableSnapshot tableSnapshot = isTxnTable ? getTableSnapshot(tbl, writeId) : null;
- List newFiles = Lists.newArrayList();
- Partition oldPartition = partitionDetails.partition;
- // load the partition
- Partition partition = loadPartitionInternal(entry.getKey(), tbl,
- fullPartSpec, oldPartition, loadFileType, true, false, numLB > 0, false, isAcid,
- resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
- // if the partition already existed before the loading, no need to add it again to the
- // metastore
+ for (Entry entry : partitionDetailsMap.entrySet()) {
+ tasks.add(() -> {
+ PartitionDetails partitionDetails = entry.getValue();
+ Map fullPartSpec = partitionDetails.fullSpec;
+ try {
- if (tableSnapshot != null) {
- partition.getTPartition().setWriteId(tableSnapshot.getWriteId());
- }
- partitionDetails.tableSnapshot = tableSnapshot;
- if (oldPartition == null) {
- partitionDetails.newFiles = newFiles;
- partitionDetails.partition = partition;
- }
+ SessionState.setCurrentSessionState(parentSession);
+ LOG.info("New loading path = " + entry.getKey() + " withPartSpec " + fullPartSpec);
+
+ List newFiles = Lists.newArrayList();
+ Partition oldPartition = partitionDetails.partition;
+ // load the partition
+ Partition partition = loadPartitionInternal(entry.getKey(), tbl,
+ fullPartSpec, oldPartition, loadFileType, true, false, numLB > 0, false, isAcid,
+ resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
+ // if the partition already existed before the loading, no need to add it again to the
+ // metastore
+
+ if (tableSnapshot != null) {
+ partition.getTPartition().setWriteId(tableSnapshot.getWriteId());
+ }
+ partitionDetails.tableSnapshot = tableSnapshot;
+ if (oldPartition == null) {
+ partitionDetails.newFiles = newFiles;
+ partitionDetails.partition = partition;
+ }
- if (inPlaceEligible) {
- synchronized (ps) {
- InPlaceUpdate.rePositionCursor(ps);
- partitionsLoaded.incrementAndGet();
- InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
- + partsToLoad + " partitions.");
+ if (inPlaceEligible) {
+ synchronized (ps) {
+ InPlaceUpdate.rePositionCursor(ps);
+ partitionsLoaded.incrementAndGet();
+ InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
+ + partsToLoad + " partitions.");
+ }
}
+
+ return partition;
+ } catch (Exception e) {
+ LOG.error("Exception when loading partition with parameters "
+ + " partPath=" + entry.getKey() + ", "
+ + " table=" + tbl.getTableName() + ", "
+ + " partSpec=" + fullPartSpec + ", "
+ + " loadFileType=" + loadFileType.toString() + ", "
+ + " listBucketingLevel=" + numLB + ", "
+ + " isAcid=" + isAcid + ", "
+ + " resetStatistics=" + resetStatistics, e);
+ throw e;
}
+ });
+ }
- return partition;
- } catch (Exception e) {
- LOG.error("Exception when loading partition with parameters "
- + " partPath=" + entry.getKey() + ", "
- + " table=" + tbl.getTableName() + ", "
- + " partSpec=" + fullPartSpec + ", "
- + " loadFileType=" + loadFileType.toString() + ", "
- + " listBucketingLevel=" + numLB + ", "
- + " isAcid=" + isAcid + ", "
- + " resetStatistics=" + resetStatistics, e);
- throw e;
+ int poolSize = conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1);
+ ExecutorService executor = Executors.newFixedThreadPool(poolSize,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("load-dynamic-partitionsToAdd-%d").build());
+
+ List> futures = Lists.newLinkedList();
+ Map