diff --git a/beeline/pom.xml b/beeline/pom.xml
index 19ec53eba6..0bf065d802 100644
--- a/beeline/pom.xml
+++ b/beeline/pom.xml
@@ -105,6 +105,12 @@
tests
test
+
+ org.apache.hive.hcatalog
+ hive-hcatalog-server-extensions
+ ${project.version}
+ test
+
org.apache.hive
hive-service
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 5f9d809ab2..6959febf42 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -333,7 +333,7 @@ public PartitionFiles next() {
Partition p = partitionIter.next();
Iterator fileIterator;
//For transactional tables, the actual file copy will be done by acid write event during replay of commit txn.
- if (!TxnUtils.isTransactionalTable(t)) {
+ if (!TxnUtils.isTransactionalTable(t) && p.getSd() != null) {
List files = Lists.newArrayList(new FileIterator(p.getSd().getLocation()));
fileIterator = files.iterator();
} else {
@@ -760,7 +760,8 @@ public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumn
.buildUpdateTableColumnStatMessage(updateTableColumnStatEvent.getColStats(),
updateTableColumnStatEvent.getTableObj(),
updateTableColumnStatEvent.getTableParameters(),
- updateTableColumnStatEvent.getWriteId());
+ updateTableColumnStatEvent.getWriteId(),
+ updateTableColumnStatEvent.getWriteIds());
NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_TABLE_COLUMN_STAT.toString(),
msgEncoder.getSerializer().serialize(msg));
ColumnStatisticsDesc statDesc = updateTableColumnStatEvent.getColStats().getStatsDesc();
@@ -790,7 +791,8 @@ public void onUpdatePartitionColumnStat(UpdatePartitionColumnStatEvent updatePar
updatePartColStatEvent.getPartVals(),
updatePartColStatEvent.getPartParameters(),
updatePartColStatEvent.getTableObj(),
- updatePartColStatEvent.getWriteId());
+ updatePartColStatEvent.getWriteId(),
+ updatePartColStatEvent.getWriteIds());
NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_PARTITION_COLUMN_STAT.toString(),
msgEncoder.getSerializer().serialize(msg));
ColumnStatisticsDesc statDesc = updatePartColStatEvent.getPartColStats().getStatsDesc();
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java
index efafe0c641..afa17613fa 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java
@@ -208,7 +208,7 @@ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
Configuration conf = handler.getConf();
Table newTbl;
try {
- newTbl = handler.get_table_core(tbl.getCatName(), tbl.getDbName(), tbl.getTableName())
+ newTbl = handler.get_table_core(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), null)
.deepCopy();
newTbl.getParameters().put(
HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 4dc04f46fd..6f12eb84c4 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -391,16 +391,16 @@ public void testNoBuckets() throws Exception {
Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
- Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
- Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
- Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
- Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
- Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
- Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
- Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
- Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
- Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000002_0000002_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000003_0000004/bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000003_0000004/bucket_00000"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000003_0000004/bucket_00000"));
+ Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
+ Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000003_0000004/bucket_00000"));
queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
@@ -415,14 +415,14 @@ public void testNoBuckets() throws Exception {
runWorker(conf);
rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
- Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
- Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
- Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
- Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
- Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
- Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
- Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000006_v0000025/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000006_v0000025/bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000006_v0000025/bucket_00000"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000006_v0000025/bucket_00000"));
}
/**
@@ -906,7 +906,7 @@ private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) thro
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -918,11 +918,11 @@ private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) thro
txnBatch.write("2,Welcome to streaming".getBytes());
// data should not be visible
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}");
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -974,7 +974,7 @@ private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws E
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -986,11 +986,11 @@ private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws E
txnBatch.write("2,Welcome to streaming".getBytes());
// data should not be visible
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}");
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -1036,7 +1036,7 @@ public void testTransactionBatchCommit_Json() throws Exception {
txnBatch.write(rec1.getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -1163,7 +1163,7 @@ public void testTransactionBatchAbortAndCommit() throws Exception {
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -1182,13 +1182,13 @@ public void testMultipleTransactionBatchCommits() throws Exception {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
- checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello streaming");
+ checkDataWritten2(partLoc, 3, 12, 1, validationQuery, false, "1\tHello streaming");
txnBatch.beginNextTransaction();
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten2(partLoc, 1, 10, 1, validationQuery, true, "1\tHello streaming",
+ checkDataWritten2(partLoc, 3, 12, 1, validationQuery, true, "1\tHello streaming",
"2\tWelcome to streaming");
txnBatch.close();
@@ -1199,14 +1199,14 @@ public void testMultipleTransactionBatchCommits() throws Exception {
txnBatch.write("3,Hello streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false, "1\tHello streaming",
+ checkDataWritten2(partLoc, 3, 22, 2, validationQuery, false, "1\tHello streaming",
"2\tWelcome to streaming", "3\tHello streaming - once again");
txnBatch.beginNextTransaction();
txnBatch.write("4,Welcome to streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true, "1\tHello streaming",
+ checkDataWritten2(partLoc, 3, 22, 2, validationQuery, true, "1\tHello streaming",
"2\tWelcome to streaming", "3\tHello streaming - once again",
"4\tWelcome to streaming - once again");
@@ -1243,7 +1243,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception {
txnBatch2.commit();
String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
- checkDataWritten2(partLoc, 11, 20, 1,
+ checkDataWritten2(partLoc, 13, 22, 1,
validationQuery, true, "3\tHello streaming - once again");
txnBatch1.commit();
@@ -1263,7 +1263,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception {
Assert.assertTrue("", logicalLength == actualLength);
}
}
- checkDataWritten2(partLoc, 1, 20, 2,
+ checkDataWritten2(partLoc, 3, 22, 2,
validationQuery, false,"1\tHello streaming", "3\tHello streaming - once again");
txnBatch1.beginNextTransaction();
@@ -1288,19 +1288,19 @@ public void testInterleavedTransactionBatchCommits() throws Exception {
Assert.assertTrue("", logicalLength <= actualLength);
}
}
- checkDataWritten2(partLoc, 1, 20, 2,
+ checkDataWritten2(partLoc, 3, 22, 2,
validationQuery, true,"1\tHello streaming", "3\tHello streaming - once again");
txnBatch1.commit();
- checkDataWritten2(partLoc, 1, 20, 2,
+ checkDataWritten2(partLoc, 3, 22, 2,
validationQuery, false, "1\tHello streaming",
"2\tWelcome to streaming",
"3\tHello streaming - once again");
txnBatch2.commit();
- checkDataWritten2(partLoc, 1, 20, 2,
+ checkDataWritten2(partLoc, 3, 22, 2,
validationQuery, true, "1\tHello streaming",
"2\tWelcome to streaming",
"3\tHello streaming - once again",
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 0212e076cd..0e1df69656 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -263,11 +263,6 @@ public boolean dropTable(String catName, String dbName, String tableName)
}
}
- @Override
- public Table getTable(String catName, String dbName, String tableName) throws MetaException {
- return objectStore.getTable(catName, dbName, tableName);
- }
-
@Override
public Table getTable(String catName, String dbName, String tableName,
String writeIdList) throws MetaException {
@@ -280,12 +275,6 @@ public boolean addPartition(Partition part)
return objectStore.addPartition(part);
}
- @Override
- public Partition getPartition(String catName, String dbName, String tableName, List partVals)
- throws MetaException, NoSuchObjectException {
- return objectStore.getPartition(catName, dbName, tableName, partVals);
- }
-
@Override
public Partition getPartition(String catName, String dbName, String tableName,
List partVals, String writeIdList)
@@ -305,15 +294,15 @@ public boolean dropPartition(String catName, String dbName, String tableName, Li
}
@Override
- public List getPartitions(String catName, String dbName, String tableName, int max)
+ public List getPartitions(String catName, String dbName, String tableName, int max, String writeIdList)
throws MetaException, NoSuchObjectException {
- return objectStore.getPartitions(catName, dbName, tableName, max);
+ return objectStore.getPartitions(catName, dbName, tableName, max, writeIdList);
}
@Override
public Map getPartitionLocations(String catName, String dbName, String tblName,
- String baseLocationToNotShow, int max) {
- return objectStore.getPartitionLocations(catName, dbName, tblName, baseLocationToNotShow, max);
+ String baseLocationToNotShow, int max, String writeIdList) {
+ return objectStore.getPartitionLocations(catName, dbName, tblName, baseLocationToNotShow, max, writeIdList);
}
@Override
@@ -378,9 +367,9 @@ public Table alterTable(String catName, String dbName, String name, Table newTab
}
@Override
- public List listPartitionNames(String catName, String dbName, String tblName, short maxParts)
+ public List listPartitionNames(String catName, String dbName, String tblName, short maxParts, String writeIdList)
throws MetaException {
- return objectStore.listPartitionNames(catName, dbName, tblName, maxParts);
+ return objectStore.listPartitionNames(catName, dbName, tblName, maxParts, writeIdList);
}
@Override
@@ -388,7 +377,7 @@ public PartitionValuesResponse listPartitionValues(String catName, String db_nam
String tbl_name, List cols,
boolean applyDistinct, String filter,
boolean ascending, List order,
- long maxParts) throws MetaException {
+ long maxParts, String writeIdList) throws MetaException {
return null;
}
@@ -416,42 +405,43 @@ public Partition alterPartition(String catName, String dbName, String tblName, L
@Override
public List getPartitionsByFilter(String catName, String dbName, String tblName,
- String filter, short maxParts) throws MetaException, NoSuchObjectException {
- return objectStore.getPartitionsByFilter(catName, dbName, tblName, filter, maxParts);
+ String filter, short maxParts, String writeIdList) throws MetaException, NoSuchObjectException {
+ return objectStore.getPartitionsByFilter(catName, dbName, tblName, filter, maxParts, writeIdList);
}
@Override
public List getPartitionSpecsByFilterAndProjection(Table table,
- GetPartitionsProjectionSpec projectionSpec, GetPartitionsFilterSpec filterSpec)
+ GetPartitionsProjectionSpec projectionSpec, GetPartitionsFilterSpec filterSpec, String writeIdList)
throws MetaException, NoSuchObjectException {
- return objectStore.getPartitionSpecsByFilterAndProjection(table, projectionSpec, filterSpec);
+ return objectStore.getPartitionSpecsByFilterAndProjection(table, projectionSpec, filterSpec, writeIdList);
}
@Override
public int getNumPartitionsByFilter(String catName, String dbName, String tblName,
- String filter) throws MetaException, NoSuchObjectException {
- return objectStore.getNumPartitionsByFilter(catName, dbName, tblName, filter);
+ String filter, String writeIdList) throws MetaException, NoSuchObjectException {
+ return objectStore.getNumPartitionsByFilter(catName, dbName, tblName, filter, writeIdList);
}
@Override
public int getNumPartitionsByExpr(String catName, String dbName, String tblName,
- byte[] expr) throws MetaException, NoSuchObjectException {
- return objectStore.getNumPartitionsByExpr(catName, dbName, tblName, expr);
+ byte[] expr, String writeIdList) throws MetaException, NoSuchObjectException {
+ return objectStore.getNumPartitionsByExpr(catName, dbName, tblName, expr, writeIdList);
}
@Override
public List getPartitionsByNames(String catName, String dbName, String tblName,
- List partNames)
+ List partNames, String writeIdList)
throws MetaException, NoSuchObjectException {
return objectStore.getPartitionsByNames(
- catName, dbName, tblName, partNames);
+ catName, dbName, tblName, partNames, writeIdList);
}
@Override
public boolean getPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr,
- String defaultPartitionName, short maxParts, List result) throws TException {
+ String defaultPartitionName, short maxParts, List result,
+ String writeIdList) throws TException {
return objectStore.getPartitionsByExpr(catName,
- dbName, tblName, expr, defaultPartitionName, maxParts, result);
+ dbName, tblName, expr, defaultPartitionName, maxParts, result, writeIdList);
}
@Override
@@ -622,34 +612,36 @@ public Role getRole(String roleName) throws NoSuchObjectException {
@Override
public Partition getPartitionWithAuth(String catName, String dbName, String tblName,
- List partVals, String userName, List groupNames)
+ List partVals, String userName,
+ List groupNames, String writeIdList)
throws MetaException, NoSuchObjectException, InvalidObjectException {
return objectStore.getPartitionWithAuth(catName, dbName, tblName, partVals, userName,
- groupNames);
+ groupNames, writeIdList);
}
@Override
public List getPartitionsWithAuth(String catName, String dbName, String tblName,
- short maxParts, String userName, List groupNames)
+ short maxParts, String userName,
+ List groupNames, String writeIdList)
throws MetaException, NoSuchObjectException, InvalidObjectException {
return objectStore.getPartitionsWithAuth(catName, dbName, tblName, maxParts, userName,
- groupNames);
+ groupNames, writeIdList);
}
@Override
public List listPartitionNamesPs(String catName, String dbName, String tblName,
- List partVals, short maxParts)
+ List partVals, short maxParts, String writeIdList)
throws MetaException, NoSuchObjectException {
- return objectStore.listPartitionNamesPs(catName, dbName, tblName, partVals, maxParts);
+ return objectStore.listPartitionNamesPs(catName, dbName, tblName, partVals, maxParts, writeIdList);
}
@Override
public List listPartitionsPsWithAuth(String catName, String dbName, String tblName,
List partVals, short maxParts, String userName,
- List groupNames)
+ List groupNames, String writeIdList)
throws MetaException, InvalidObjectException, NoSuchObjectException {
return objectStore.listPartitionsPsWithAuth(catName, dbName, tblName, partVals, maxParts,
- userName, groupNames);
+ userName, groupNames, writeIdList);
}
@Override
@@ -720,12 +712,6 @@ public long cleanupEvents() {
return objectStore.listTableColumnGrantsAll(catName, dbName, tableName, columnName);
}
- @Override
- public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tableName,
- List colNames) throws MetaException, NoSuchObjectException {
- return objectStore.getTableColumnStatistics(catName, dbName, tableName, colNames);
- }
-
@Override
public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tableName,
List colNames,
@@ -817,14 +803,6 @@ public void setMetaStoreSchemaVersion(String schemaVersion, String comment) thro
}
- @Override
- public List getPartitionColumnStatistics(String catName, String dbName,
- String tblName, List colNames,
- List partNames)
- throws MetaException, NoSuchObjectException {
- return objectStore.getPartitionColumnStatistics(catName, dbName, tblName , colNames, partNames);
- }
-
@Override
public List getPartitionColumnStatistics(String catName, String dbName,
String tblName, List colNames,
@@ -837,9 +815,9 @@ public void setMetaStoreSchemaVersion(String schemaVersion, String comment) thro
@Override
public boolean doesPartitionExist(String catName, String dbName, String tableName,
- List partKeys, List partVals)
+ List partKeys, List partVals, String writeIdList)
throws MetaException, NoSuchObjectException {
- return objectStore.doesPartitionExist(catName, dbName, tableName, partKeys, partVals);
+ return objectStore.doesPartitionExist(catName, dbName, tableName, partKeys, partVals, writeIdList);
}
@Override
@@ -905,13 +883,6 @@ public Function getFunction(String catName, String dbName, String funcName)
return objectStore.getFunctions(catName, dbName, pattern);
}
- @Override
- public AggrStats get_aggr_stats_for(String catName, String dbName,
- String tblName, List partNames, List colNames)
- throws MetaException {
- return null;
- }
-
@Override
public AggrStats get_aggr_stats_for(String catName, String dbName,
String tblName, List partNames, List colNames,
@@ -1324,5 +1295,4 @@ public int deleteRuntimeStats(int maxRetainSecs) throws MetaException {
NoSuchObjectException {
return null;
}
-
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
index 285f30b008..74fc40232d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
@@ -10,6 +10,7 @@
import org.apache.hadoop.hive.metastore.*;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.cache.CachedStore.MergedColumnStatsForPartitions;
import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -44,7 +45,6 @@ public void setUp() throws Exception {
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
MetastoreConf.setVar(conf, ConfVars.TRANSACTIONAL_EVENT_LISTENERS, DbNotificationListener.class.getName());
MetastoreConf.setVar(conf, ConfVars.RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.cache.CachedStore");
- MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT, true);
MetastoreConf.setBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED, true);
MetastoreConf.setBoolVar(conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED, false);
MetaStoreTestUtils.setConfForStandloneMode(conf);
@@ -120,84 +120,6 @@ private void comparePartitions(Partition part1, Partition part2) {
Assert.assertEquals(part1.getLastAccessTime(), part2.getLastAccessTime());
}
- @Test
- public void testDatabaseOpsForUpdateUsingEvents() throws Exception {
- RawStore rawStore = hmsHandler.getMS();
-
- // Prewarm CachedStore
- CachedStore.setCachePrewarmedState(false);
- CachedStore.prewarm(rawStore);
-
- // Add a db via rawStore
- String dbName = "testDatabaseOps";
- String dbOwner = "user1";
- Database db = createTestDb(dbName, dbOwner);
-
- hmsHandler.create_database(db);
- db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName);
-
- // Read database via CachedStore
- Database dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName);
- Assert.assertEquals(db, dbRead);
-
- // Add another db via rawStore
- final String dbName1 = "testDatabaseOps1";
- Database db1 = createTestDb(dbName1, dbOwner);
- hmsHandler.create_database(db1);
- db1 = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
-
- // Read database via CachedStore
- dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName1);
- Assert.assertEquals(db1, dbRead);
-
- // Alter the db via rawStore (can only alter owner or parameters)
- dbOwner = "user2";
- Database newdb = new Database(db);
- newdb.setOwnerName(dbOwner);
- hmsHandler.alter_database(dbName, newdb);
- newdb = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName);
-
- // Read db via cachedStore
- dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName);
- Assert.assertEquals(newdb, dbRead);
-
- // Add another db via rawStore
- final String dbName2 = "testDatabaseOps2";
- Database db2 = createTestDb(dbName2, dbOwner);
- hmsHandler.create_database(db2);
- db2 = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName2);
-
- // Alter db "testDatabaseOps" via rawStore
- dbOwner = "user1";
- newdb = new Database(db);
- newdb.setOwnerName(dbOwner);
- hmsHandler.alter_database(dbName, newdb);
- newdb = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName);
-
- // Drop db "testDatabaseOps1" via rawStore
- Database dropDb = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
- hmsHandler.drop_database(dbName1, true, true);
-
- // Read the newly added db via CachedStore
- dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName2);
- Assert.assertEquals(db2, dbRead);
-
- // Read the altered db via CachedStore (altered user from "user2" to "user1")
- dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName);
- Assert.assertEquals(newdb, dbRead);
-
- // Try to read the dropped db after cache update
- dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName1);
- Assert.assertEquals(null, dbRead);
-
- // Clean up
- hmsHandler.drop_database(dbName, true, true);
- hmsHandler.drop_database(dbName2, true, true);
- sharedCache.getDatabaseCache().clear();
- sharedCache.clearTableCache();
- sharedCache.getSdCache().clear();
- }
-
@Test
public void testTableOpsForUpdateUsingEvents() throws Exception {
long lastEventId = -1;
@@ -205,7 +127,7 @@ public void testTableOpsForUpdateUsingEvents() throws Exception {
// Prewarm CachedStore
CachedStore.setCachePrewarmedState(false);
- CachedStore.prewarm(rawStore);
+ CachedStore.prewarm(rawStore, conf);
// Add a db via rawStore
String dbName = "test_table_ops";
@@ -225,19 +147,17 @@ public void testTableOpsForUpdateUsingEvents() throws Exception {
List ptnCols = new ArrayList();
Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols);
hmsHandler.create_table(tbl);
- tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
+ tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName, null);
- // Read database, table via CachedStore
- Database dbRead= sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName);
- Assert.assertEquals(db, dbRead);
- Table tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName);
+ // Read table via CachedStore
+ Table tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, null);
compareTables(tblRead, tbl);
// Add a new table via rawStore
String tblName2 = "tbl2";
Table tbl2 = createTestTbl(dbName, tblName2, tblOwner, cols, ptnCols);
hmsHandler.create_table(tbl2);
- tbl2 = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName2);
+ tbl2 = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName2, null);
// Alter table "tbl" via rawStore
tblOwner = "role1";
@@ -245,7 +165,7 @@ public void testTableOpsForUpdateUsingEvents() throws Exception {
newTable.setOwner(tblOwner);
newTable.setOwnerType(PrincipalType.ROLE);
hmsHandler.alter_table(dbName, tblName, newTable);
- newTable = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
+ newTable = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName, null);
Assert.assertEquals("Owner of the table did not change.", tblOwner, newTable.getOwner());
Assert.assertEquals("Owner type of the table did not change", PrincipalType.ROLE, newTable.getOwnerType());
@@ -254,23 +174,22 @@ public void testTableOpsForUpdateUsingEvents() throws Exception {
hmsHandler.drop_table(dbName, tblName2, true);
// Read the altered "tbl" via CachedStore
- tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName);
+ tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, null);
compareTables(tblRead, newTable);
// Try to read the dropped "tbl2" via CachedStore (should throw exception)
- tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName2);
+ tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName2, null);
Assert.assertNull(tblRead);
// Clean up
hmsHandler.drop_database(dbName, true, true);
- tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName2);
+ tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName2, null);
Assert.assertNull(tblRead);
- tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName);
+ tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, null);
Assert.assertNull(tblRead);
- sharedCache.getDatabaseCache().clear();
sharedCache.clearTableCache();
sharedCache.getSdCache().clear();
}
@@ -282,7 +201,7 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {
// Prewarm CachedStore
CachedStore.setCachePrewarmedState(false);
- CachedStore.prewarm(rawStore);
+ CachedStore.prewarm(rawStore, conf);
// Add a db via rawStore
String dbName = "test_partition_ops";
@@ -304,7 +223,7 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {
ptnCols.add(ptnCol1);
Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols);
hmsHandler.create_table(tbl);
- tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
+ tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName, null);
final String ptnColVal1 = "aaa";
Map partParams = new HashMap();
@@ -313,7 +232,7 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {
0, tbl.getSd(), partParams);
ptn1.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.add_partition(ptn1);
- ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1));
+ ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1), null);
final String ptnColVal2 = "bbb";
Partition ptn2 =
@@ -321,13 +240,10 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {
0, tbl.getSd(), partParams);
ptn2.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.add_partition(ptn2);
- ptn2 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2));
+ ptn2 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2), null);
- // Read database, table, partition via CachedStore
- Database dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase());
- Assert.assertEquals(db, dbRead);
Table tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
- dbName.toLowerCase(), tblName.toLowerCase());
+ dbName.toLowerCase(), tblName.toLowerCase(), null);
compareTables(tbl, tblRead);
Partition ptn1Read = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal1));
@@ -343,20 +259,20 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {
0, tbl.getSd(), partParams);
ptn3.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.add_partition(ptn3);
- ptn3 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3));
+ ptn3 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3), null);
// Alter an existing partition ("aaa") via rawStore
- ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1));
+ ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1), null);
final String ptnColVal1Alt = "aaa";
Partition ptn1Atl =
new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0,
0, tbl.getSd(), partParams);
ptn1Atl.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.alter_partitions(dbName, tblName, Arrays.asList(ptn1Atl));
- ptn1Atl = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt));
+ ptn1Atl = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt), null);
// Drop an existing partition ("bbb") via rawStore
- Partition ptnDrop = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2));
+ Partition ptnDrop = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2), null);
hmsHandler.drop_partition(dbName, tblName, Arrays.asList(ptnColVal2), false);
// Read the newly added partition via CachedStore
@@ -382,13 +298,12 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {
// Clean up
rawStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName);
- sharedCache.getDatabaseCache().clear();
sharedCache.clearTableCache();
sharedCache.getSdCache().clear();
}
- private void updateTableColStats(String dbName, String tblName, String[] colName,
- double highValue, double avgColLen, boolean isTxnTable) throws Throwable {
+ private long updateTableColStats(String dbName, String tblName, String[] colName,
+ double highValue, double avgColLen, boolean isTxnTable, long lastEventId) throws Throwable {
long writeId = -1;
String validWriteIds = null;
if (isTxnTable) {
@@ -412,6 +327,7 @@ private void updateTableColStats(String dbName, String tblName, String[] colName
// write stats objs persistently
hmsHandler.update_table_column_statistics_req(setTblColStat);
+ lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId, null);
validateTablePara(dbName, tblName);
ColumnStatistics colStatsCache = sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
@@ -423,10 +339,11 @@ private void updateTableColStats(String dbName, String tblName, String[] colName
dbName, tblName, Lists.newArrayList(colName[1]), validWriteIds, true);
Assert.assertEquals(colStatsCache.getStatsObj().get(0).getColName(), colName[1]);
verifyStatString(colStatsCache.getStatsObj().get(0), colName[1], avgColLen);
+ return lastEventId;
}
- private void updatePartColStats(String dbName, String tblName, boolean isTxnTable, String[] colName,
- String partName, double highValue, double avgColLen) throws Throwable {
+ private long updatePartColStats(String dbName, String tblName, boolean isTxnTable, String[] colName,
+ String partName, double highValue, double avgColLen, long lastEventId) throws Throwable {
long writeId = -1;
String validWriteIds = null;
List txnIds = null;
@@ -471,7 +388,7 @@ private void updatePartColStats(String dbName, String tblName, boolean isTxnTabl
} else {
Assert.assertEquals(statRowStore.get(0).isIsStatsCompliant(), false);
}
-
+ lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId, conf);
List statSharedCache = sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
dbName, tblName, Collections.singletonList(partName), Collections.singletonList(colName[1]),
validWriteIds, true);
@@ -489,6 +406,8 @@ private void updatePartColStats(String dbName, String tblName, boolean isTxnTabl
statPartCache = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
CachedStore.partNameToVals(partName), colName[1], validWriteIds);
verifyStatString(statPartCache.getColumnStatisticsObj(), colName[1], avgColLen);
+
+ return lastEventId;
}
private List getStatsObjects(String dbName, String tblName, String[] colName,
@@ -572,7 +491,7 @@ private void setUpBeforeTest(String dbName, String tblName, String[] colName, bo
// Prewarm CachedStore
CachedStore.setCachePrewarmedState(false);
- CachedStore.prewarm(rawStore);
+ CachedStore.prewarm(rawStore, conf);
// Add a db via rawStore
Database db = createTestDb(dbName, dbOwner);
@@ -670,8 +589,8 @@ private String getValidWriteIds(String dbName, String tblName) throws Throwable
}
private void validateTablePara(String dbName, String tblName) throws Throwable {
- Table tblRead = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
- Table tblRead1 = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName);
+ Table tblRead = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName, null);
+ Table tblRead1 = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, null);
Assert.assertEquals(tblRead.getParameters(), tblRead1.getParameters());
}
@@ -706,18 +625,19 @@ private void testTableColStatInternal(String dbName, String tblName, boolean isT
String[] colName = new String[]{"income", "name"};
double highValue = 1200000.4525;
double avgColLen = 50.30;
+ long lastEventId = 0;
setUpBeforeTest(dbName, tblName, colName, isTxnTable);
- updateTableColStats(dbName, tblName, colName, highValue, avgColLen, isTxnTable);
+ lastEventId = updateTableColStats(dbName, tblName, colName, highValue, avgColLen, isTxnTable, lastEventId);
if (!isTxnTable) {
deleteColStats(dbName, tblName, colName);
}
tblName = "tbl_part";
createTableWithPart(dbName, tblName, colName, isTxnTable);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1, null);
String partName = partitions.get(0);
- updatePartColStats(dbName, tblName, isTxnTable, colName, partName, highValue, avgColLen);
+ lastEventId = updatePartColStats(dbName, tblName, isTxnTable, colName, partName, highValue, avgColLen, lastEventId);
if (!isTxnTable) {
deletePartColStats(dbName, tblName, colName, partName);
}
@@ -747,11 +667,12 @@ public void testTableColumnStatisticsTxnTableMulti() throws Throwable {
setUpBeforeTest(dbName, null, colName, true);
createTableWithPart(dbName, tblName, colName, true);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1, null);
String partName = partitions.get(0);
- updatePartColStats(dbName, tblName, true, colName, partName, highValue, avgColLen);
- updatePartColStats(dbName, tblName, true, colName, partName, 1200000.4521, avgColLen);
- updatePartColStats(dbName, tblName, true, colName, partName, highValue, 34.78);
+ long lastEventId = 0;
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partName, highValue, avgColLen, lastEventId);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partName, 1200000.4521, avgColLen, lastEventId);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partName, highValue, 34.78, lastEventId);
}
@Test
@@ -761,10 +682,11 @@ public void testTableColumnStatisticsTxnTableMultiAbort() throws Throwable {
String[] colName = new String[]{"income", "name"};
double highValue = 1200000.4525;
double avgColLen = 50.30;
+ long lastEventId = 0;
setUpBeforeTest(dbName, null, colName, true);
createTableWithPart(dbName, tblName, colName, true);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1, null);
String partName = partitions.get(0);
List txnIds = allocateTxns(1);
@@ -804,6 +726,7 @@ public void testTableColumnStatisticsTxnTableMultiAbort() throws Throwable {
verifyStat(statRawStore.get(0).getStatsObj(), colName, highValue, avgColLen);
Assert.assertEquals(statRawStore.get(0).isIsStatsCompliant(), false);
+ lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId, conf);
List statsListFromCache = sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
dbName, tblName, Collections.singletonList(partName), Collections.singletonList(colName[1]),
validWriteIds, true);
@@ -824,14 +747,15 @@ public void testTableColumnStatisticsTxnTableOpenTxn() throws Throwable {
String[] colName = new String[]{"income", "name"};
double highValue = 1200000.4121;
double avgColLen = 23.30;
+ long lastEventId = 0;
setUpBeforeTest(dbName, null, colName, true);
createTableWithPart(dbName, tblName, colName, true);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1, null);
String partName = partitions.get(0);
// update part col stats successfully.
- updatePartColStats(dbName, tblName, true, colName, partName, 1.2, 12.2);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partName, 1.2, 12.2, lastEventId);
List txnIds = allocateTxns(1);
long writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId();
@@ -854,6 +778,7 @@ public void testTableColumnStatisticsTxnTableOpenTxn() throws Throwable {
// write stats objs persistently
hmsHandler.update_partition_column_statistics_req(setTblColStat);
+ lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId, conf);
// keep the txn open and verify that the stats got is not compliant.
@@ -904,9 +829,9 @@ private void verifyAggrStat(String dbName, String tblName, String[] colName, Lis
Assert.assertEquals(aggrStatsCached, aggrStats);
//Assert.assertEquals(aggrStatsCached.isIsStatsCompliant(), true);
- List stats = sharedCache.getAggrStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
- Collections.singletonList(colName[0]), SharedCache.StatsType.ALL);
- Assert.assertEquals(stats.get(0).getStatsData().getDoubleStats().getHighValue(), highValue, 0.01);
+ MergedColumnStatsForPartitions stats = CachedStore.mergeColStatsForPartitions(DEFAULT_CATALOG_NAME, dbName, tblName, Lists.newArrayList("income=1", "income=2"),
+ Collections.singletonList(colName[0]), sharedCache, SharedCache.StatsType.ALL, validWriteIds, false, 0.0);
+ Assert.assertEquals(stats.colStats.get(0).getStatsData().getDoubleStats().getHighValue(), highValue, 0.01);
}
@Test
@@ -917,15 +842,17 @@ public void testAggrStat() throws Throwable {
setUpBeforeTest(dbName, null, colName, false);
createTableWithPart(dbName, tblName, colName, false);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short) -1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short) -1, null);
String partName = partitions.get(0);
// update part col stats successfully.
- updatePartColStats(dbName, tblName, false, colName, partitions.get(0), 2, 12);
- updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 4, 10);
+ long lastEventId = 0;
+ lastEventId = updatePartColStats(dbName, tblName, false, colName, partitions.get(0), 2, 12, lastEventId);
+ lastEventId = updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 4, 10, lastEventId);
+ lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId, conf);
verifyAggrStat(dbName, tblName, colName, partitions, false, 4);
- updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 3, 10);
+ lastEventId = updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 3, 10, lastEventId);
verifyAggrStat(dbName, tblName, colName, partitions, false, 3);
}
@@ -934,18 +861,19 @@ public void testAggrStatTxnTable() throws Throwable {
String dbName = "aggr_stats_test_db_txn";
String tblName = "tbl_part";
String[] colName = new String[]{"income", "name"};
+ long lastEventId = 0;
setUpBeforeTest(dbName, null, colName, true);
createTableWithPart(dbName, tblName, colName, true);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1, null);
String partName = partitions.get(0);
// update part col stats successfully.
- updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12);
- updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12, lastEventId);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10, lastEventId);
verifyAggrStat(dbName, tblName, colName, partitions, true, 4);
- updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 3, 10);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 3, 10, lastEventId);
verifyAggrStat(dbName, tblName, colName, partitions, true, 3);
List txnIds = allocateTxns(1);
@@ -988,15 +916,16 @@ public void testAggrStatAbortTxn() throws Throwable {
String dbName = "aggr_stats_test_db_txn_abort";
String tblName = "tbl_part";
String[] colName = new String[]{"income", "name"};
+ long lastEventId = 0;
setUpBeforeTest(dbName, null, colName, true);
createTableWithPart(dbName, tblName, colName, true);
- List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+ List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1, null);
String partName = partitions.get(0);
// update part col stats successfully.
- updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12);
- updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12, lastEventId);
+ lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10, lastEventId);
verifyAggrStat(dbName, tblName, colName, partitions, true, 4);
List txnIds = allocateTxns(4);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index cfd3bfea1f..85b8375c46 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -683,11 +683,11 @@ public void testAcidInsertWithRemoveUnion() throws Exception {
}
String[][] expected2 = {
- {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000001_0000001_0003/bucket_00000"}
+ {"{\"writeid\":2,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000002_0000002_0001/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000002_0000002_0001/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000002_0000002_0002/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000002_0000002_0002/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000002_0000002_0003/bucket_00000"}
};
Assert.assertEquals("Unexpected row count", expected2.length, rs.size());
for(int i = 0; i < expected2.length; i++) {
@@ -728,11 +728,11 @@ public void testBucketedAcidInsertWithRemoveUnion() throws Exception {
LOG.warn(s);
}
String[][] expected2 = {
- {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t5\t6", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t9\t10", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"}
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000002_0000002_0000/bucket_00001"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000002_0000002_0000/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":2}\t5\t6", "warehouse/t/delta_0000002_0000002_0000/bucket_00001"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000002_0000002_0000/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t9\t10", "warehouse/t/delta_0000002_0000002_0000/bucket_00001"}
};
Assert.assertEquals("Unexpected row count", expected2.length, rs.size());
for(int i = 0; i < expected2.length; i++) {
@@ -866,14 +866,14 @@ public void testCrudMajorCompactionSplitGrouper() throws Exception {
runStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", confForTez);
runStatementOnDriver("delete from " + tblName + " where b = 2");
List expectedRs = new ArrayList<>();
- expectedRs.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4");
- expectedRs.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3");
- expectedRs.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4");
- expectedRs.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3");
- expectedRs.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t4");
- expectedRs.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3");
- expectedRs.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4");
- expectedRs.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3");
+ expectedRs.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t2\t4");
+ expectedRs.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t2\t3");
+ expectedRs.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t3\t4");
+ expectedRs.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t3\t3");
+ expectedRs.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4");
+ expectedRs.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t1\t3");
+ expectedRs.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t4\t4");
+ expectedRs.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t4\t3");
List rs =
runStatementOnDriver("select ROW__ID, * from " + tblName + " order by ROW__ID.bucketid, ROW__ID", confForTez);
HiveConf.setVar(confForTez, HiveConf.ConfVars.SPLIT_GROUPING_MODE, "compactor");
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 61be5a3a5b..6aeb9f3f7c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -643,18 +643,18 @@ public void minorCompactWhileStreaming() throws Exception {
Path resultFile = null;
for (int i = 0; i < names.length; i++) {
names[i] = stat[i].getPath().getName();
- if (names[i].equals("delta_0000001_0000004_v0000009")) {
+ if (names[i].equals("delta_0000002_0000005_v0000009")) {
resultFile = stat[i].getPath();
}
}
Arrays.sort(names);
- String[] expected = new String[]{"delta_0000001_0000002",
- "delta_0000001_0000004_v0000009", "delta_0000003_0000004", "delta_0000005_0000006"};
+ String[] expected = new String[]{"delta_0000002_0000003",
+ "delta_0000002_0000005_v0000009", "delta_0000004_0000005", "delta_0000006_0000007"};
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names) + ",stat=" + toString(stat));
}
checkExpectedTxnsPresent(null, new Path[]{resultFile}, columnNamesProperty, columnTypesProperty,
- 0, 1L, 4L, 1);
+ 0, 2L, 5L, 1);
} finally {
if (connection != null) {
@@ -697,8 +697,8 @@ public void majorCompactWhileStreaming() throws Exception {
Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
}
String name = stat[0].getPath().getName();
- Assert.assertEquals("base_0000004_v0000009", name);
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1);
+ Assert.assertEquals("base_0000005_v0000009", name);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 2L, 5L, 1);
} finally {
if (connection != null) {
connection.close();
@@ -740,17 +740,17 @@ private void minorCompactAfterAbort(boolean newStreamingAPI) throws Exception {
Path resultDelta = null;
for (int i = 0; i < names.length; i++) {
names[i] = stat[i].getPath().getName();
- if (names[i].equals("delta_0000001_0000004_v0000009")) {
+ if (names[i].equals("delta_0000002_0000005_v0000009")) {
resultDelta = stat[i].getPath();
}
}
Arrays.sort(names);
- String[] expected = new String[]{"delta_0000001_0000002",
- "delta_0000001_0000004_v0000009", "delta_0000003_0000004"};
+ String[] expected = new String[]{"delta_0000002_0000003",
+ "delta_0000002_0000005_v0000009", "delta_0000004_0000005"};
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1);
+ checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 2L, 5L, 1);
}
@Test
@@ -787,10 +787,10 @@ private void majorCompactAfterAbort(boolean newStreamingAPI) throws Exception {
Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
}
String name = stat[0].getPath().getName();
- if (!name.equals("base_0000004_v0000009")) {
+ if (!name.equals("base_0000005_v0000009")) {
Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000004");
}
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 2L, 5L, 1);
}
@@ -817,12 +817,12 @@ public void mmTable() throws Exception {
runMajorCompaction(dbName, tblName);
verifyFooBarResult(tblName, 1);
- verifyHasBase(table.getSd(), fs, "base_0000002_v0000006");
+ verifyHasBase(table.getSd(), fs, "base_0000003_v0000006");
// Make sure we don't compact if we don't need to compact.
runMajorCompaction(dbName, tblName);
verifyFooBarResult(tblName, 1);
- verifyHasBase(table.getSd(), fs, "base_0000002_v0000006");
+ verifyHasBase(table.getSd(), fs, "base_0000003_v0000006");
}
@Test
@@ -938,7 +938,7 @@ public void mmTableBucketed() throws Exception {
runMajorCompaction(dbName, tblName);
verifyFooBarResult(tblName, 1);
- String baseDir = "base_0000002_v0000006";
+ String baseDir = "base_0000003_v0000006";
verifyHasBase(table.getSd(), fs, baseDir);
FileStatus[] files = fs.listStatus(new Path(table.getSd().getLocation(), baseDir),
@@ -965,7 +965,7 @@ public void mmTableOpenWriteId() throws Exception {
long openTxnId = msClient.openTxn("test");
long openWriteId = msClient.allocateTableWriteId(openTxnId, dbName, tblName);
- Assert.assertEquals(3, openWriteId); // Just check to make sure base_5 below is not new.
+ Assert.assertEquals(4, openWriteId); // Just check to make sure base_5 below is not new.
executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
@@ -974,19 +974,19 @@ public void mmTableOpenWriteId() throws Exception {
runMajorCompaction(dbName, tblName); // Don't compact 4 and 5; 3 is opened.
FileSystem fs = FileSystem.get(conf);
- verifyHasBase(table.getSd(), fs, "base_0000002_v0000010");
+ verifyHasBase(table.getSd(), fs, "base_0000003_v0000010");
verifyDirCount(table.getSd(), fs, 1, AcidUtils.baseFileFilter);
verifyFooBarResult(tblName, 2);
runCleaner(conf);
- verifyHasDir(table.getSd(), fs, "delta_0000004_0000004_0000", AcidUtils.deltaFileFilter);
verifyHasDir(table.getSd(), fs, "delta_0000005_0000005_0000", AcidUtils.deltaFileFilter);
+ verifyHasDir(table.getSd(), fs, "delta_0000006_0000006_0000", AcidUtils.deltaFileFilter);
verifyFooBarResult(tblName, 2);
msClient.abortTxns(Lists.newArrayList(openTxnId)); // Now abort 3.
runMajorCompaction(dbName, tblName); // Compact 4 and 5.
verifyFooBarResult(tblName, 2);
- verifyHasBase(table.getSd(), fs, "base_0000005_v0000016");
+ verifyHasBase(table.getSd(), fs, "base_0000006_v0000016");
runCleaner(conf);
verifyDeltaCount(table.getSd(), fs, 0);
}
@@ -1050,8 +1050,8 @@ public void mmTablePartitioned() throws Exception {
verifyFooBarResult(tblName, 3);
verifyDeltaCount(p3.getSd(), fs, 1);
- verifyHasBase(p1.getSd(), fs, "base_0000006_v0000010");
- verifyHasBase(p2.getSd(), fs, "base_0000006_v0000014");
+ verifyHasBase(p1.getSd(), fs, "base_0000007_v0000010");
+ verifyHasBase(p2.getSd(), fs, "base_0000007_v0000014");
executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver);
executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver);
@@ -1061,8 +1061,8 @@ public void mmTablePartitioned() throws Exception {
// Make sure we don't compact if we don't need to compact; but do if we do.
verifyFooBarResult(tblName, 4);
verifyDeltaCount(p3.getSd(), fs, 1);
- verifyHasBase(p1.getSd(), fs, "base_0000006_v0000010");
- verifyHasBase(p2.getSd(), fs, "base_0000008_v0000023");
+ verifyHasBase(p1.getSd(), fs, "base_0000007_v0000010");
+ verifyHasBase(p2.getSd(), fs, "base_0000009_v0000023");
}
@@ -1159,8 +1159,8 @@ private void majorCompactWhileStreamingForSplitUpdate(boolean newStreamingAPI) t
Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
}
String name = stat[0].getPath().getName();
- Assert.assertEquals("base_0000004_v0000009", name);
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 1, 1L, 4L, 2);
+ Assert.assertEquals("base_0000005_v0000009", name);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 1, 2L, 5L, 2);
if (connection1 != null) {
connection1.close();
}
@@ -1209,18 +1209,18 @@ public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exce
Path minorCompactedDelta = null;
for (int i = 0; i < deltas.length; i++) {
deltas[i] = stat[i].getPath().getName();
- if (deltas[i].equals("delta_0000001_0000003_v0000006")) {
+ if (deltas[i].equals("delta_0000002_0000004_v0000006")) {
minorCompactedDelta = stat[i].getPath();
}
}
Arrays.sort(deltas);
- String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003_v0000006",
- "delta_0000002_0000002_0000"};
+ String[] expectedDeltas = new String[]{"delta_0000002_0000002_0000", "delta_0000002_0000004_v0000006",
+ "delta_0000003_0000003_0000"};
if (!Arrays.deepEquals(expectedDeltas, deltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
}
checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty,
- 0, 1L, 2L, 1);
+ 0, 2L, 3L, 1);
// Verify that we have got correct set of delete_deltas.
FileStatus[] deleteDeltaStat =
@@ -1229,17 +1229,17 @@ public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exce
Path minorCompactedDeleteDelta = null;
for (int i = 0; i < deleteDeltas.length; i++) {
deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
- if (deleteDeltas[i].equals("delete_delta_0000001_0000003_v0000006")) {
+ if (deleteDeltas[i].equals("delete_delta_0000002_0000004_v0000006")) {
minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
}
}
Arrays.sort(deleteDeltas);
- String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003_v0000006", "delete_delta_0000003_0000003_0000"};
+ String[] expectedDeleteDeltas = new String[]{"delete_delta_0000002_0000004_v0000006", "delete_delta_0000004_0000004_0000"};
if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty,
- 0, 2L, 2L, 1);
+ 0, 3L, 3L, 1);
}
@Test
@@ -1281,18 +1281,18 @@ public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception
Path minorCompactedDelta = null;
for (int i = 0; i < deltas.length; i++) {
deltas[i] = stat[i].getPath().getName();
- if (deltas[i].equals("delta_0000001_0000002_v0000005")) {
+ if (deltas[i].equals("delta_0000002_0000003_v0000005")) {
minorCompactedDelta = stat[i].getPath();
}
}
Arrays.sort(deltas);
- String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002_v0000005",
- "delta_0000002_0000002_0000"};
+ String[] expectedDeltas = new String[]{"delta_0000002_0000002_0000", "delta_0000002_0000003_v0000005",
+ "delta_0000003_0000003_0000"};
if (!Arrays.deepEquals(expectedDeltas, deltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
}
checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty,
- 0, 1L, 2L, 1);
+ 0, 2L, 3L, 1);
//Assert that we have no delete deltas if there are no input delete events.
FileStatus[] deleteDeltaStat =
@@ -1358,18 +1358,18 @@ private void minorCompactWhileStreamingWithSplitUpdate(boolean newStreamingAPI)
Path resultFile = null;
for (int i = 0; i < names.length; i++) {
names[i] = stat[i].getPath().getName();
- if (names[i].equals("delta_0000001_0000004_v0000009")) {
+ if (names[i].equals("delta_0000002_0000005_v0000009")) {
resultFile = stat[i].getPath();
}
}
Arrays.sort(names);
- String[] expected = new String[]{"delta_0000001_0000002",
- "delta_0000001_0000004_v0000009", "delta_0000003_0000004", "delta_0000005_0000006"};
+ String[] expected = new String[]{"delta_0000002_0000003",
+ "delta_0000002_0000005_v0000009", "delta_0000004_0000005", "delta_0000006_0000007"};
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
checkExpectedTxnsPresent(null, new Path[]{resultFile}, columnNamesProperty, columnTypesProperty,
- 0, 1L, 4L, 1);
+ 0, 2L, 5L, 1);
//Assert that we have no delete deltas if there are no input delete events.
FileStatus[] deleteDeltaStat =
@@ -1699,6 +1699,9 @@ public boolean isWriteIdAborted(long writeid) {
public RangeResponse isWriteIdRangeAborted(long minWriteId, long maxWriteId) {
return RangeResponse.ALL;
}
+
+ @Override
+ public void commitWriteId(long writeId) {};
};
OrcInputFormat aif = new OrcInputFormat();
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/MetaStoreDumpUtility.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/MetaStoreDumpUtility.java
index 2389c3bc68..e72c5216af 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/MetaStoreDumpUtility.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/MetaStoreDumpUtility.java
@@ -41,7 +41,9 @@
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ObjectStore;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.cache.CachedStore;
import org.apache.hive.testutils.HiveTestEnvSetup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -223,6 +225,12 @@ public int compare(String str1, String str2) {
conn.close();
+ CachedStore.clearSharedCache();
+ ObjectStore objStore = new ObjectStore();
+ objStore.setConf(conf);
+ CachedStore.getSharedCache().initialize(conf);
+ CachedStore.prewarm(objStore, conf);
+
} catch (Exception e) {
throw new RuntimeException("error while loading tpcds metastore dump", e);
}
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index d2c2ccd5ea..f330434db9 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -156,10 +156,13 @@ public void initConf() throws Exception {
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, true);
}
- // Plug verifying metastore in for testing DirectSQL.
- conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.VerifyingObjectStore");
-
miniClusters.initConf(conf);
+
+ // Plug verifying metastore in for testing DirectSQL.
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.cache.CachedStore");
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS, "org.apache.hive.hcatalog.listener.DbNotificationListener");
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
}
public QTestUtil(QTestArguments testArgs) throws Exception {
@@ -378,6 +381,7 @@ public void newSession(boolean canReuseSession) throws Exception {
miniClusters.restartSessions(canReuseSession, ss, oldSs);
closeSession(oldSs);
+ ss.initTxnMgr(conf);
SessionState.start(ss);
cliDriver = new CliDriver();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 2eb65918c9..88d2c37b9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1090,22 +1090,35 @@ private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) throws Lo
}
List txnTables = getTransactionalTableList(plan);
ValidTxnWriteIdList txnWriteIds = null;
- if (compactionWriteIds != null) {
- /**
- * This is kludgy: here we need to read with Compactor's snapshot/txn
- * rather than the snapshot of the current {@code txnMgr}, in effect
- * simulating a "flashback query" but can't actually share compactor's
- * txn since it would run multiple statements. See more comments in
- * {@link org.apache.hadoop.hive.ql.txn.compactor.Worker} where it start
- * the compactor txn*/
- if (txnTables.size() != 1) {
- throw new LockException("Unexpected tables in compaction: " + txnTables);
- }
- String fullTableName = txnTables.get(0);
- txnWriteIds = new ValidTxnWriteIdList(compactorTxnId);
- txnWriteIds.addTableValidWriteIdList(compactionWriteIds);
- } else {
- txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString);
+
+ // If we have collected all required table writeid (in SemanticAnalyzer), skip fetch again
+ if (conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY) != null) {
+ txnWriteIds = new ValidTxnWriteIdList(conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
+ for (String txnTable : txnTables) {
+ if (txnWriteIds.getTableValidWriteIdList(txnTable) == null) {
+ txnWriteIds = null;
+ break;
+ }
+ }
+ }
+ if (txnWriteIds == null) {
+ if (compactionWriteIds != null) {
+ /**
+ * This is kludgy: here we need to read with Compactor's snapshot/txn
+ * rather than the snapshot of the current {@code txnMgr}, in effect
+ * simulating a "flashback query" but can't actually share compactor's
+ * txn since it would run multiple statements. See more comments in
+ * {@link org.apache.hadoop.hive.ql.txn.compactor.Worker} where it start
+ * the compactor txn*/
+ if (txnTables.size() != 1) {
+ throw new LockException("Unexpected tables in compaction: " + txnTables);
+ }
+ String fullTableName = txnTables.get(0);
+ txnWriteIds = new ValidTxnWriteIdList(compactorTxnId);
+ txnWriteIds.addTableValidWriteIdList(compactionWriteIds);
+ } else {
+ txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString);
+ }
}
String writeIdStr = txnWriteIds.toString();
conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr);
@@ -1339,35 +1352,45 @@ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnMa
}
// If we've opened a transaction we need to commit or rollback rather than explicitly
// releasing the locks.
- conf.unset(ValidTxnList.VALID_TXNS_KEY);
- conf.unset(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
if(!checkConcurrency()) {
return;
}
- if (txnMgr.isTxnOpen()) {
- if (commit) {
- if(conf.getBoolVar(ConfVars.HIVE_IN_TEST) && conf.getBoolVar(ConfVars.HIVETESTMODEROLLBACKTXN)) {
+ try {
+ if (txnMgr.isTxnOpen()) {
+ if (commit) {
+ if(conf.getBoolVar(ConfVars.HIVE_IN_TEST) && conf.getBoolVar(ConfVars.HIVETESTMODEROLLBACKTXN)) {
+ txnMgr.rollbackTxn();
+ }
+ else {
+ txnMgr.commitTxn();//both commit & rollback clear ALL locks for this tx
+ }
+ } else {
txnMgr.rollbackTxn();
}
- else {
- txnMgr.commitTxn();//both commit & rollback clear ALL locks for this tx
- }
} else {
- txnMgr.rollbackTxn();
+ //since there is no tx, we only have locks for current query (if any)
+ if (ctx != null && ctx.getHiveLocks() != null) {
+ hiveLocks.addAll(ctx.getHiveLocks());
+ }
+ txnMgr.releaseLocks(hiveLocks);
}
- } else {
- //since there is no tx, we only have locks for current query (if any)
- if (ctx != null && ctx.getHiveLocks() != null) {
- hiveLocks.addAll(ctx.getHiveLocks());
+ } finally {
+ hiveLocks.clear();
+ if (ctx != null) {
+ ctx.setHiveLocks(null);
}
- txnMgr.releaseLocks(hiveLocks);
- }
- hiveLocks.clear();
- if (ctx != null) {
- ctx.setHiveLocks(null);
- }
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
+ conf.unset(ValidTxnList.VALID_TXNS_KEY);
+ conf.unset(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
+ SessionState.get().getConf().unset(ValidTxnList.VALID_TXNS_KEY);
+ SessionState.get().getConf().unset(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
+ try {
+ Hive.get().clearValidWriteIdList();
+ } catch (HiveException e) {
+ throw new RuntimeException("Error clear ValidWriteIdList, this shall never happen", e);
+ }
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
+ }
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index 267f7d041f..1077421ac4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -21,6 +21,8 @@
import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.LineageState;
import org.apache.hadoop.hive.ql.session.SessionState;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 295fe7cbd0..be4d4dc334 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1969,6 +1969,42 @@ public static TableSnapshot getTableSnapshot(Configuration conf,
validWriteIdList != null ? validWriteIdList.toString() : null);
}
+ /**
+ * This is called by Hive.java for all write operations (DDL). Advance write id
+ * for the table via transaction manager, and store it in config. The write id
+ * will be marked as committed instantly in config, as all DDL are auto
+ * committed, there's no chance to rollback.
+ */
+ public static ValidWriteIdList advanceWriteId(HiveConf conf, Table tbl) throws LockException {
+ if (!isTransactionalTable(tbl)) {
+ return null;
+ }
+ HiveTxnManager txnMgr = SessionState.get().getTxnMgr();
+ long writeId = SessionState.get().getTxnMgr().getTableWriteId(tbl.getDbName(), tbl.getTableName());
+ List txnTables = new ArrayList<>();
+ String fullTableName = getFullTableName(tbl.getDbName(), tbl.getTableName());
+ txnTables.add(fullTableName);
+ ValidTxnWriteIdList txnWriteIds;
+ if (conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY) != null) {
+ txnWriteIds = new ValidTxnWriteIdList(conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
+ } else {
+ String txnString;
+ if (conf.get(ValidTxnList.VALID_TXNS_KEY) != null) {
+ txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ } else {
+ ValidTxnList txnIds = txnMgr.getValidTxns();
+ txnString = txnIds.toString();
+ }
+ txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString);
+ }
+ ValidWriteIdList writeIds = txnWriteIds.getTableValidWriteIdList(fullTableName);
+ if (writeIds != null) {
+ writeIds.commitWriteId(writeId);
+ conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, txnWriteIds.toString());
+ }
+ return writeIds;
+ }
+
/**
* Returns ValidWriteIdList for the table with the given "dbName" and "tableName".
* This is called when HiveConf has no list for the table.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index d412dd72d1..82fb21f2ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -680,7 +680,6 @@ private void stopHeartbeat() throws LockException {
@Override
public ValidTxnList getValidTxns() throws LockException {
- assert isTxnOpen();
init();
try {
return getMS().getValidTxns(txnId);
@@ -692,7 +691,6 @@ public ValidTxnList getValidTxns() throws LockException {
@Override
public ValidTxnWriteIdList getValidWriteIds(List tableList,
String validTxnList) throws LockException {
- assert isTxnOpen();
assert validTxnList != null && !validTxnList.isEmpty();
try {
return TxnCommonUtils.createValidTxnWriteIdList(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 17a2d20a00..77efa676e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -53,10 +53,12 @@
private HiveLockManagerCtx lockManagerCtx;
+ private long txnId = 0;
+ private int numTxn = 0;
+
@Override
public long openTxn(Context ctx, String user) throws LockException {
- // No-op
- return 0L;
+ return txnId++;
}
@Override
public List replOpenTxn(String replPolicy, List srcTxnIds, String user) throws LockException {
@@ -65,11 +67,11 @@ public long openTxn(Context ctx, String user) throws LockException {
@Override
public boolean isTxnOpen() {
- return false;
+ return numTxn != 0;
}
@Override
public long getCurrentTxnId() {
- return 0L;
+ return txnId;
}
@Override
public int getStmtIdAndIncrement() {
@@ -228,7 +230,7 @@ public void releaseLocks(List hiveLocks) throws LockException {
@Override
public void commitTxn() throws LockException {
- // No-op
+ numTxn--;
}
@Override
@@ -238,7 +240,7 @@ public void replCommitTxn(CommitTxnRequest rqst) throws LockException {
@Override
public void rollbackTxn() throws LockException {
- // No-op
+ numTxn--;
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 691f3ee2e9..1df095baa2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -111,6 +111,7 @@
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc;
import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableDropPartitionDesc;
@@ -119,6 +120,7 @@
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils.TableSnapshot;
import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
@@ -326,7 +328,12 @@ private static Hive getInternal(HiveConf c, boolean needsRefresh, boolean isFast
}
db = create(c, doRegisterAllFns);
}
- if (c != null) {
+ if (c != null && db.conf != c) {
+ if (db.conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY) != null) {
+ c.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, db.conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
+ } else {
+ c.unset(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
+ }
db.conf = c;
}
return db;
@@ -690,6 +697,7 @@ public void alterTable(String catName, String dbName, String tblName, Table newT
EnvironmentContext environmentContext, boolean transactional, long replWriteId)
throws HiveException {
+ boolean txnOpened = false;
if (catName == null) {
catName = getDefaultCatalog(conf);
}
@@ -709,6 +717,12 @@ public void alterTable(String catName, String dbName, String tblName, Table newT
// Take a table snapshot and set it to newTbl.
AcidUtils.TableSnapshot tableSnapshot = null;
if (transactional) {
+ if (AcidUtils.isTransactionalTable(newTbl) && newTbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString()) == null) {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, newTbl);
+ }
+
if (replWriteId > 0) {
// We need a valid writeId list for a transactional table modification. During
// replication we do not have a valid writeId list which was used to modify the table
@@ -738,6 +752,12 @@ public void alterTable(String catName, String dbName, String tblName, Table newT
throw new HiveException("Unable to alter table. " + e.getMessage(), e);
} catch (TException e) {
throw new HiveException("Unable to alter table. " + e.getMessage(), e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
@@ -789,6 +809,7 @@ public void alterPartition(String tblName, Partition newPart,
public void alterPartition(String catName, String dbName, String tblName, Partition newPart,
EnvironmentContext environmentContext, boolean transactional)
throws InvalidOperationException, HiveException {
+ boolean txnOpened = false;
try {
if (catName == null) {
catName = getDefaultCatalog(conf);
@@ -802,8 +823,14 @@ public void alterPartition(String catName, String dbName, String tblName, Partit
if (environmentContext == null) {
environmentContext = new EnvironmentContext();
}
+
AcidUtils.TableSnapshot tableSnapshot = null;
if (transactional) {
+ if (AcidUtils.isTransactionalTable(newPart.getTable()) && newPart.getTable().getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString()) == null) {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, newPart.getTable());
+ }
tableSnapshot = AcidUtils.getTableSnapshot(conf, newPart.getTable(), true);
if (tableSnapshot != null) {
newPart.getTPartition().setWriteId(tableSnapshot.getWriteId());
@@ -819,6 +846,12 @@ public void alterPartition(String catName, String dbName, String tblName, Partit
throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
} catch (TException e) {
throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
@@ -846,12 +879,18 @@ private void validatePartition(Partition newPart) throws HiveException {
public void alterPartitions(String tblName, List newParts,
EnvironmentContext environmentContext, boolean transactional)
throws InvalidOperationException, HiveException {
+ boolean txnOpened = false;
String[] names = Utilities.getDbTableName(tblName);
List newTParts =
new ArrayList();
try {
AcidUtils.TableSnapshot tableSnapshot = null;
if (transactional) {
+ if (AcidUtils.isTransactionalTable(newParts.get(0).getTable()) && newParts.get(0).getTable().getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString()) == null) {
+ // Advance writeId for ddl on transactional table
+ txnOpened = openTxnIfNeeded();
+ AcidUtils.advanceWriteId(conf, newParts.get(0).getTable());
+ }
tableSnapshot = AcidUtils.getTableSnapshot(conf, newParts.get(0).getTable(), true);
}
// Remove the DDL time so that it gets refreshed
@@ -873,6 +912,12 @@ public void alterPartitions(String tblName, List newParts,
throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
} catch (TException e) {
throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
/**
@@ -889,6 +934,7 @@ public void alterPartitions(String tblName, List newParts,
public void renamePartition(Table tbl, Map oldPartSpec, Partition newPart,
long replWriteId)
throws HiveException {
+ boolean txnOpened = false;
try {
Map newPartSpec = newPart.getSpec();
if (oldPartSpec.keySet().size() != tbl.getPartCols().size()
@@ -912,6 +958,10 @@ public void renamePartition(Table tbl, Map oldPartSpec, Partitio
String validWriteIds = null;
if (AcidUtils.isTransactionalTable(tbl)) {
TableSnapshot tableSnapshot;
+ if (AcidUtils.isTransactionalTable(tbl) && tbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString()) == null) {
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, tbl);
+ }
if (replWriteId > 0) {
// We need a valid writeId list for a transactional table modification. During
// replication we do not have a valid writeId list which was used to modify the table
@@ -941,6 +991,12 @@ public void renamePartition(Table tbl, Map oldPartSpec, Partitio
throw new HiveException("Unable to rename partition. " + e.getMessage(), e);
} catch (TException e) {
throw new HiveException("Unable to rename partition. " + e.getMessage(), e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
@@ -1000,6 +1056,7 @@ public void createTable(Table tbl, boolean ifNotExists,
List defaultConstraints,
List checkConstraints)
throws HiveException {
+ boolean txnOpened = false;
try {
if (tbl.getDbName() == null || "".equals(tbl.getDbName().trim())) {
tbl.setDbName(SessionState.get().getCurrentDatabase());
@@ -1024,6 +1081,11 @@ public void createTable(Table tbl, boolean ifNotExists,
tTbl.setPrivileges(principalPrivs);
}
}
+ if (AcidUtils.isTransactionalTable(tbl) && tbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString()) == null) {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, tbl);
+ }
// Set table snapshot to api.Table to make it persistent. A transactional table being
// replicated may have a valid write Id copied from the source. Use that instead of
// crafting one on the replica.
@@ -1049,6 +1111,12 @@ public void createTable(Table tbl, boolean ifNotExists,
}
} catch (Exception e) {
throw new HiveException(e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
@@ -1143,7 +1211,18 @@ public void dropTable(String dbName, String tableName, boolean deleteData,
*/
public void dropTable(String dbName, String tableName, boolean deleteData,
boolean ignoreUnknownTab, boolean ifPurge) throws HiveException {
+ boolean txnOpened = false;
try {
+ Table tbl = null;
+ try {
+ tbl = getTable(dbName, tableName);
+ } catch (InvalidTableException e) {
+ }
+ if (tbl != null && AcidUtils.isTransactionalTable(tbl) && tbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString()) == null) {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, tbl);
+ }
getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab, ifPurge);
} catch (NoSuchObjectException e) {
if (!ignoreUnknownTab) {
@@ -1158,6 +1237,12 @@ public void dropTable(String dbName, String tableName, boolean deleteData,
throw new HiveException(e);
} catch (Exception e) {
throw new HiveException(e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
@@ -1171,10 +1256,17 @@ public void dropTable(String dbName, String tableName, boolean deleteData,
* @throws HiveException
*/
public void truncateTable(String dbDotTableName, Map partSpec, Long writeId) throws HiveException {
+ boolean txnOpened = false;
try {
Table table = getTable(dbDotTableName, true);
+
AcidUtils.TableSnapshot snapshot = null;
- if (AcidUtils.isTransactionalTable(table)) {
+ if (AcidUtils.isTransactionalTable(table) && table.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString()) == null) {
+
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, table);
+
if (writeId <= 0) {
snapshot = AcidUtils.getTableSnapshot(conf, table, true);
} else {
@@ -1195,6 +1287,12 @@ public void truncateTable(String dbDotTableName, Map partSpec, L
}
} catch (Exception e) {
throw new HiveException(e);
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
}
}
@@ -1261,7 +1359,7 @@ public Table getTable(final String dbName, final String tableName) throws HiveEx
*/
public Table getTable(final String dbName, final String tableName,
boolean throwException) throws HiveException {
- return this.getTable(dbName, tableName, throwException, false);
+ return this.getTable(dbName, tableName, throwException, true);
}
/**
@@ -1311,20 +1409,7 @@ public Table getTable(final String dbName, final String tableName, boolean throw
// Get the table from metastore
org.apache.hadoop.hive.metastore.api.Table tTable = null;
try {
- // Note: this is currently called w/true from StatsOptimizer only.
- if (checkTransactional) {
- ValidWriteIdList validWriteIdList = null;
- long txnId = SessionState.get().getTxnMgr() != null ?
- SessionState.get().getTxnMgr().getCurrentTxnId() : 0;
- if (txnId > 0) {
- validWriteIdList = AcidUtils.getTableValidWriteIdListWithTxnList(conf,
- dbName, tableName);
- }
- tTable = getMSC().getTable(getDefaultCatalog(conf), dbName, tableName,
- validWriteIdList != null ? validWriteIdList.toString() : null, getColumnStats);
- } else {
- tTable = getMSC().getTable(dbName, tableName, getColumnStats);
- }
+ tTable = getMSC().getTable(getDefaultCatalog(conf), dbName, tableName, checkTransactional, getColumnStats);
} catch (NoSuchObjectException e) {
if (throwException) {
throw new InvalidTableException(tableName);
@@ -1984,48 +2069,61 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par
boolean isSrcLocal, boolean isAcidIUDoperation,
boolean resetStatistics, Long writeId,
int stmtId, boolean isInsertOverwrite) throws HiveException {
+ boolean txnOpened = false;
+ try {
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION);
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION);
-
- // Get the partition object if it already exists
- Partition oldPart = getPartition(tbl, partSpec, false);
- boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
-
- // If config is set, table is not temporary and partition being inserted exists, capture
- // the list of files added. For not yet existing partitions (insert overwrite to new partition
- // or dynamic partition inserts), the add partition event will capture the list of files added.
- List newFiles = Collections.synchronizedList(new ArrayList<>());
+ // Get the partition object if it already exists
+ Partition oldPart = getPartition(tbl, partSpec, false);
+ boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
- Partition newTPart = loadPartitionInternal(loadPath, tbl, partSpec, oldPart,
- loadFileType, inheritTableSpecs,
- inheritLocation, isSkewedStoreAsSubdir, isSrcLocal, isAcidIUDoperation,
- resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
+ // If config is set, table is not temporary and partition being inserted exists, capture
+ // the list of files added. For not yet existing partitions (insert overwrite to new partition
+ // or dynamic partition inserts), the add partition event will capture the list of files added.
+ List newFiles = Collections.synchronizedList(new ArrayList<>());
- AcidUtils.TableSnapshot tableSnapshot = isTxnTable ? getTableSnapshot(tbl, writeId) : null;
- if (tableSnapshot != null) {
- newTPart.getTPartition().setWriteId(tableSnapshot.getWriteId());
- }
+ Partition newTPart = loadPartitionInternal(loadPath, tbl, partSpec, oldPart,
+ loadFileType, inheritTableSpecs,
+ inheritLocation, isSkewedStoreAsSubdir, isSrcLocal, isAcidIUDoperation,
+ resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
- if (oldPart == null) {
- addPartitionToMetastore(newTPart, resetStatistics, tbl, tableSnapshot);
- // For acid table, add the acid_write event with file list at the time of load itself. But
- // it should be done after partition is created.
- if (isTxnTable && (null != newFiles)) {
- addWriteNotificationLog(tbl, partSpec, newFiles, writeId);
+ if (AcidUtils.isTransactionalTable(tbl) && tbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString()) == null) {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, tbl);
}
- } else {
- try {
- setStatsPropAndAlterPartition(resetStatistics, tbl, newTPart, tableSnapshot);
- } catch (TException e) {
- LOG.error(StringUtils.stringifyException(e));
- throw new HiveException(e);
+ AcidUtils.TableSnapshot tableSnapshot = isTxnTable ? getTableSnapshot(tbl, writeId) : null;
+ if (tableSnapshot != null) {
+ newTPart.getTPartition().setWriteId(tableSnapshot.getWriteId());
}
- }
- perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION);
+ if (oldPart == null) {
+ addPartitionToMetastore(newTPart, resetStatistics, tbl, tableSnapshot);
+ // For acid table, add the acid_write event with file list at the time of load itself. But
+ // it should be done after partition is created.
+ if (isTxnTable && (null != newFiles)) {
+ addWriteNotificationLog(tbl, partSpec, newFiles, writeId);
+ }
+ } else {
+ try {
+ setStatsPropAndAlterPartition(resetStatistics, tbl, newTPart, tableSnapshot);
+ } catch (TException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
+ }
+ }
+
+ perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION);
- return newTPart;
+ return newTPart;
+ } finally {
+ if (txnOpened) {
+ if (SessionState.get().getTxnMgr().isTxnOpen()) {
+ SessionState.get().getTxnMgr().commitTxn();
+ }
+ }
+ }
}
/**
@@ -2633,206 +2731,219 @@ private void constructOneLBLocationMap(FileStatus fSta,
final int numDP, final int numLB, final boolean isAcid, final long writeId, final int stmtId,
final boolean resetStatistics, final AcidUtils.Operation operation,
boolean isInsertOverwrite) throws HiveException {
+ boolean txnOpened = false;
+ try {
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS);
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS);
-
- // Get all valid partition paths and existing partitions for them (if any)
- final Table tbl = getTable(tableName);
- final Set validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId,
- AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite);
-
- final int partsToLoad = validPartitions.size();
- final AtomicInteger partitionsLoaded = new AtomicInteger(0);
- final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
- && InPlaceUpdate.canRenderInPlace(conf) && !SessionState.getConsole().getIsSilent();
- final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null;
-
- final SessionState parentSession = SessionState.get();
- List> tasks = Lists.newLinkedList();
+ // Get all valid partition paths and existing partitions for them (if any)
+ final Table tbl = getTable(tableName);
+ final Set validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId,
+ AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite);
- final class PartitionDetails {
- Map fullSpec;
- Partition partition;
- List newFiles;
- boolean hasOldPartition = false;
- AcidUtils.TableSnapshot tableSnapshot;
- }
+ final int partsToLoad = validPartitions.size();
+ final AtomicInteger partitionsLoaded = new AtomicInteger(0);
+ final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
+ && InPlaceUpdate.canRenderInPlace(conf) && !SessionState.getConsole().getIsSilent();
+ final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null;
- Map partitionDetailsMap =
- Collections.synchronizedMap(new LinkedHashMap<>());
+ final SessionState parentSession = SessionState.get();
+ List> tasks = Lists.newLinkedList();
- // calculate full path spec for each valid partition path
- validPartitions.forEach(partPath -> {
- Map fullPartSpec = Maps.newLinkedHashMap(partSpec);
- if (!Warehouse.makeSpecFromName(fullPartSpec, partPath, new HashSet<>(partSpec.keySet()))) {
- Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath);
- } else {
- PartitionDetails details = new PartitionDetails();
- details.fullSpec = fullPartSpec;
- partitionDetailsMap.put(partPath, details);
+ final class PartitionDetails {
+ Map fullSpec;
+ Partition partition;
+ List newFiles;
+ boolean hasOldPartition = false;
+ AcidUtils.TableSnapshot tableSnapshot;
}
- });
- // fetch all the partitions matching the part spec using the partition iterable
- // this way the maximum batch size configuration parameter is considered
- PartitionIterable partitionIterable = new PartitionIterable(Hive.get(), tbl, partSpec,
- conf.getInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 300));
- Iterator iterator = partitionIterable.iterator();
+ Map partitionDetailsMap =
+ Collections.synchronizedMap(new LinkedHashMap<>());
- // Match valid partition path to partitions
- while (iterator.hasNext()) {
- Partition partition = iterator.next();
- partitionDetailsMap.entrySet().stream()
- .filter(entry -> entry.getValue().fullSpec.equals(partition.getSpec()))
- .findAny().ifPresent(entry -> {
- entry.getValue().partition = partition;
- entry.getValue().hasOldPartition = true;
- });
- }
+ // calculate full path spec for each valid partition path
+ validPartitions.forEach(partPath -> {
+ Map fullPartSpec = Maps.newLinkedHashMap(partSpec);
+ if (!Warehouse.makeSpecFromName(fullPartSpec, partPath, new HashSet<>(partSpec.keySet()))) {
+ Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath);
+ } else {
+ PartitionDetails details = new PartitionDetails();
+ details.fullSpec = fullPartSpec;
+ partitionDetailsMap.put(partPath, details);
+ }
+ });
- boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
- AcidUtils.TableSnapshot tableSnapshot = isTxnTable ? getTableSnapshot(tbl, writeId) : null;
+ // fetch all the partitions matching the part spec using the partition iterable
+ // this way the maximum batch size configuration parameter is considered
+ PartitionIterable partitionIterable = new PartitionIterable(Hive.get(), tbl, partSpec,
+ conf.getInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 300));
+ Iterator iterator = partitionIterable.iterator();
- for (Entry entry : partitionDetailsMap.entrySet()) {
- tasks.add(() -> {
- PartitionDetails partitionDetails = entry.getValue();
- Map fullPartSpec = partitionDetails.fullSpec;
- try {
+ // Match valid partition path to partitions
+ while (iterator.hasNext()) {
+ Partition partition = iterator.next();
+ partitionDetailsMap.entrySet().stream()
+ .filter(entry -> entry.getValue().fullSpec.equals(partition.getSpec()))
+ .findAny().ifPresent(entry -> {
+ entry.getValue().partition = partition;
+ entry.getValue().hasOldPartition = true;
+ });
+ }
- SessionState.setCurrentSessionState(parentSession);
- LOG.info("New loading path = " + entry.getKey() + " withPartSpec " + fullPartSpec);
+ boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
+ if (isTxnTable && tbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString()) == null) {
+ txnOpened = openTxnIfNeeded();
+ // Advance writeId for ddl on transactional table
+ AcidUtils.advanceWriteId(conf, tbl);
+ }
+ AcidUtils.TableSnapshot tableSnapshot = isTxnTable ? getTableSnapshot(tbl, writeId) : null;
- List newFiles = Lists.newArrayList();
- Partition oldPartition = partitionDetails.partition;
- // load the partition
- Partition partition = loadPartitionInternal(entry.getKey(), tbl,
- fullPartSpec, oldPartition, loadFileType, true, false, numLB > 0, false, isAcid,
- resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
- // if the partition already existed before the loading, no need to add it again to the
- // metastore
+ for (Entry entry : partitionDetailsMap.entrySet()) {
+ tasks.add(() -> {
+ PartitionDetails partitionDetails = entry.getValue();
+ Map fullPartSpec = partitionDetails.fullSpec;
+ try {
- if (tableSnapshot != null) {
- partition.getTPartition().setWriteId(tableSnapshot.getWriteId());
- }
- partitionDetails.tableSnapshot = tableSnapshot;
- if (oldPartition == null) {
- partitionDetails.newFiles = newFiles;
- partitionDetails.partition = partition;
- }
+ SessionState.setCurrentSessionState(parentSession);
+ LOG.info("New loading path = " + entry.getKey() + " withPartSpec " + fullPartSpec);
+
+ List newFiles = Lists.newArrayList();
+ Partition oldPartition = partitionDetails.partition;
+ // load the partition
+ Partition partition = loadPartitionInternal(entry.getKey(), tbl,
+ fullPartSpec, oldPartition, loadFileType, true, false, numLB > 0, false, isAcid,
+ resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles);
+ // if the partition already existed before the loading, no need to add it again to the
+ // metastore
+
+ if (tableSnapshot != null) {
+ partition.getTPartition().setWriteId(tableSnapshot.getWriteId());
+ }
+ partitionDetails.tableSnapshot = tableSnapshot;
+ if (oldPartition == null) {
+ partitionDetails.newFiles = newFiles;
+ partitionDetails.partition = partition;
+ }
- if (inPlaceEligible) {
- synchronized (ps) {
- InPlaceUpdate.rePositionCursor(ps);
- partitionsLoaded.incrementAndGet();
- InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
- + partsToLoad + " partitions.");
+ if (inPlaceEligible) {
+ synchronized (ps) {
+ InPlaceUpdate.rePositionCursor(ps);
+ partitionsLoaded.incrementAndGet();
+ InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
+ + partsToLoad + " partitions.");
+ }
}
+
+ return partition;
+ } catch (Exception e) {
+ LOG.error("Exception when loading partition with parameters "
+ + " partPath=" + entry.getKey() + ", "
+ + " table=" + tbl.getTableName() + ", "
+ + " partSpec=" + fullPartSpec + ", "
+ + " loadFileType=" + loadFileType.toString() + ", "
+ + " listBucketingLevel=" + numLB + ", "
+ + " isAcid=" + isAcid + ", "
+ + " resetStatistics=" + resetStatistics, e);
+ throw e;
}
+ });
+ }
- return partition;
- } catch (Exception e) {
- LOG.error("Exception when loading partition with parameters "
- + " partPath=" + entry.getKey() + ", "
- + " table=" + tbl.getTableName() + ", "
- + " partSpec=" + fullPartSpec + ", "
- + " loadFileType=" + loadFileType.toString() + ", "
- + " listBucketingLevel=" + numLB + ", "
- + " isAcid=" + isAcid + ", "
- + " resetStatistics=" + resetStatistics, e);
- throw e;
+ int poolSize = conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1);
+ ExecutorService executor = Executors.newFixedThreadPool(poolSize,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("load-dynamic-partitionsToAdd-%d").build());
+
+ List> futures = Lists.newLinkedList();
+ Map