diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 2688f35..a9f0ce5 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -1078,6 +1078,132 @@ public void testDumpLimit() throws IOException { } @Test + public void testTruncateTable() throws IOException { + String testName = "truncateTable"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + String[] unptn_data = new String[] { "eleven", "twelve" }; + String[] empty = new String[] {}; + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data); + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data); + + run("TRUNCATE TABLE " + dbName + ".unptned"); + verifySetup("SELECT a from " + dbName + ".unptned", empty); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned", empty); + verifyRun("SELECT a from " + dbName + "_dupe.unptned", empty); + + String[] unptn_data_after_ins = new String[] { "thirteen" }; + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_after_ins[0] + "')"); + verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins); + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_after_ins); + } + + @Test + public void testTruncatePartitionedTable() throws IOException { + String testName = "truncatePartitionedTable"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".ptned_1(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned_2(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + + String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" }; + String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" }; + String[] empty = new String[] {}; + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[2] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[2] + "')"); + + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[2] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[2] + "')"); + + verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=10) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=20) ORDER BY a", ptn_data_2); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20) ORDER BY a", ptn_data_2); + + run("TRUNCATE TABLE " + dbName + ".ptned_1 PARTITION(b=2)"); + verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=2)", empty); + + run("TRUNCATE TABLE " + dbName + ".ptned_2"); + verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=10)", empty); + verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=20)", empty); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1); + verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2)", empty); + verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10)", empty); + verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20)", empty); + } + + @Test public void testStatus() throws IOException { // first test ReplStateMap functionality Map cmap = new ReplStateMap(); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java index b0defb5..868e5a5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java @@ -75,19 +75,17 @@ public void onDropTable (DropTableEvent tableEvent) throws MetaException { } /** - * @param add partition event - * @throws MetaException - */ - - /** * @param tableEvent alter table event * @throws MetaException */ public void onAlterTable (AlterTableEvent tableEvent) throws MetaException { } - public void onAddPartition (AddPartitionEvent partitionEvent) - throws MetaException { + /** + * @param partitionEvent add partition event + * @throws MetaException + */ + public void onAddPartition (AddPartitionEvent partitionEvent) throws MetaException { } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 917e565..f95d085 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4687,6 +4687,9 @@ private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws H } } } + + // Update the table stats using alterTable/Partition operations + updateTableStats(db, table, partSpec); } catch (Exception e) { throw new HiveException(e, ErrorMsg.GENERIC_ERROR); } @@ -4724,39 +4727,49 @@ private int exchangeTablePartition(Hive db, if (table.isPartitioned()) { for (Partition partition : db.getPartitions(table)) { locations.add(partition.getDataLocation()); - EnvironmentContext environmentContext = new EnvironmentContext(); - if (needToUpdateStats(partition.getParameters(), environmentContext)) { - db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext); - } } } else { locations.add(table.getPath()); - EnvironmentContext environmentContext = new EnvironmentContext(); - if (needToUpdateStats(table.getParameters(), environmentContext)) { - db.alterTable(table.getDbName()+"."+table.getTableName(), table, environmentContext); - } } } else { for (Partition partition : db.getPartitionsByNames(table, partSpec)) { locations.add(partition.getDataLocation()); - EnvironmentContext environmentContext = new EnvironmentContext(); - if (needToUpdateStats(partition.getParameters(), environmentContext)) { + } + } + return locations; + } + + private void updateTableStats(Hive db, Table table, Map partSpec) + throws HiveException, InvalidOperationException { + if (partSpec == null) { + if (table.isPartitioned()) { + for (Partition partition : db.getPartitions(table)) { + EnvironmentContext environmentContext = new EnvironmentContext(); + updateStatsForTruncate(partition.getParameters(), environmentContext); db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext); } + } else { + EnvironmentContext environmentContext = new EnvironmentContext(); + updateStatsForTruncate(table.getParameters(), environmentContext); + db.alterTable(table.getDbName()+"."+table.getTableName(), table, environmentContext); + } + } else { + for (Partition partition : db.getPartitionsByNames(table, partSpec)) { + EnvironmentContext environmentContext = new EnvironmentContext(); + updateStatsForTruncate(partition.getParameters(), environmentContext); + db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext); } } - return locations; + return; } - private boolean needToUpdateStats(Map props, EnvironmentContext environmentContext) { + private void updateStatsForTruncate(Map props, EnvironmentContext environmentContext) { if (null == props) { - return false; + return; } - boolean statsPresent = false; for (String stat : StatsSetupConst.supportedStats) { String statVal = props.get(stat); - if (statVal != null && Long.parseLong(statVal) > 0) { - statsPresent = true; + if (statVal != null) { //In the case of truncate table, we set the stats to be 0. props.put(stat, "0"); } @@ -4766,7 +4779,7 @@ private boolean needToUpdateStats(Map props, EnvironmentContext e environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK); //then invalidate column stats StatsSetupConst.clearColumnStatsState(props); - return statsPresent; + return; } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 245c483..ea00f29 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -505,7 +505,7 @@ private static void checkTable(Table table, ImportTableDesc tableDesc, Replicati } } - // Next, we verify that the destination table is not offline, a view, or a non-native table + // Next, we verify that the destination table is not offline, or a non-native table EximUtil.validateTable(table); // If the import statement specified that we're importing to an external diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 05d7be1..3708c20 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc; +import org.apache.hadoop.hive.ql.plan.TruncateTableDesc; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.IOUtils; @@ -122,8 +123,10 @@ EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"), EVENT_ALTER_TABLE("EVENT_ALTER_TABLE"), EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"), + EVENT_TRUNCATE_TABLE("EVENT_TRUNCATE_TABLE"), EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"), EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"), + EVENT_TRUNCATE_PARTITION("EVENT_TRUNCATE_PARTITION"), EVENT_INSERT("EVENT_INSERT"), EVENT_UNKNOWN("EVENT_UNKNOWN"); @@ -445,8 +448,7 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Ex conf, getNewEventOnlyReplicationSpec(ev.getEventId()) ); - EventHandlerFactory.handlerFor(ev).handle(context); - + EventHandlerFactory.handlerFor(ev, context).handle(); } public static void injectNextDumpDirForTest(String dumpdir){ @@ -932,6 +934,26 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } } } + case EVENT_TRUNCATE_TABLE: { + AlterTableMessage truncateTableMessage = md.getAlterTableMessage(dmd.getPayload()); + String actualDbName = ((dbName == null) || dbName.isEmpty() ? truncateTableMessage.getDB() : dbName); + String actualTblName = ((tblName == null) || tblName.isEmpty() ? truncateTableMessage.getTable() : tblName); + + // First perform alter table for just in case if current alter happen after truncate table + List> tasks = + analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); + + TruncateTableDesc truncateTableDesc = new TruncateTableDesc( + actualDbName + "." + actualTblName, null); + Task truncateTableTask = TaskFactory.get(new DDLWork(inputs, outputs, truncateTableDesc), conf); + if (precursor != null) { + precursor.addDependentTask(truncateTableTask); + } + tasks.add(truncateTableTask); + LOG.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), truncateTableDesc.getTableName()); + dbsUpdated.put(actualDbName,dmd.getEventTo()); + return tasks; + } case EVENT_ALTER_PARTITION: { return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); } @@ -973,6 +995,42 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { tablesUpdated.put(tableName, dmd.getEventTo()); return tasks; } + case EVENT_TRUNCATE_PARTITION: { + AlterPartitionMessage truncatePtnMessage = md.getAlterPartitionMessage(dmd.getPayload()); + String actualDbName = ((dbName == null) || dbName.isEmpty() ? truncatePtnMessage.getDB() : dbName); + String actualTblName = ((tblName == null) || tblName.isEmpty() ? truncatePtnMessage.getTable() : tblName); + + Map partSpec = new LinkedHashMap(); + try { + org.apache.hadoop.hive.metastore.api.Table tblObj = truncatePtnMessage.getTableObj(); + org.apache.hadoop.hive.metastore.api.Partition pobjAfter = truncatePtnMessage.getPtnObjAfter(); + Iterator afterValIter = pobjAfter.getValuesIterator(); + for (FieldSchema fs : tblObj.getPartitionKeys()){ + partSpec.put(fs.getName(), afterValIter.next()); + } + } catch (Exception e) { + if (!(e instanceof SemanticException)){ + throw new SemanticException("Error reading message members", e); + } else { + throw (SemanticException)e; + } + } + + // First perform alter partition for just in case if current alter happen after truncate partition + List> tasks = + analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); + + TruncateTableDesc truncateTableDesc = new TruncateTableDesc( + actualDbName + "." + actualTblName, partSpec); + Task truncatePtnTask = TaskFactory.get(new DDLWork(inputs, outputs, truncateTableDesc), conf); + if (precursor != null) { + precursor.addDependentTask(truncatePtnTask); + } + tasks.add(truncatePtnTask); + LOG.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), truncateTableDesc.getTableName()); + dbsUpdated.put(actualDbName,dmd.getEventTo()); + return tasks; + } case EVENT_INSERT: { md = MessageFactory.getInstance().getDeserializer(); InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java index ab059c2..ee2d9f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java @@ -27,10 +27,12 @@ static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class); final NotificationEvent event; + final Context withinContext; final MessageDeserializer deserializer; - AbstractHandler(NotificationEvent event) { + AbstractHandler(NotificationEvent event, Context withinContext) { this.event = event; + this.withinContext = withinContext; deserializer = MessageFactory.getInstance().getDeserializer(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java index 9a4f8b9..7344ada 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java @@ -38,12 +38,12 @@ import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; public class AddPartitionHandler extends AbstractHandler { - protected AddPartitionHandler(NotificationEvent notificationEvent) { - super(notificationEvent); + protected AddPartitionHandler(NotificationEvent notificationEvent, Context withinContext) { + super(notificationEvent, withinContext); } @Override - public void handle(Context withinContext) throws Exception { + public void handle() throws Exception { AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage()); LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage()); Iterable ptns = apm.getPartitionObjs(); @@ -89,7 +89,7 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition Iterable files = partitionFilesIter.next().getFiles(); if (files != null) { // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) { + try (BufferedWriter fileListWriter = writer(qlPtn)) { for (String file : files) { fileListWriter.write(file + "\n"); } @@ -99,7 +99,7 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition withinContext.createDmd(this).write(); } - private BufferedWriter writer(Context withinContext, Partition qlPtn) + private BufferedWriter writer(Partition qlPtn) throws IOException { Path ptnDataPath = new Path(withinContext.eventRoot, qlPtn.getName()); FileSystem fs = ptnDataPath.getFileSystem(withinContext.hiveConf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java index 1073cd0..a906fba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java @@ -17,12 +17,17 @@ */ package org.apache.hadoop.hive.ql.parse.repl.events; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; import java.util.ArrayList; import java.util.Iterator; @@ -36,8 +41,8 @@ private final org.apache.hadoop.hive.metastore.api.Table tableObject; private final Scenario scenario; - AlterPartitionHandler(NotificationEvent event) throws Exception { - super(event); + AlterPartitionHandler(NotificationEvent event, Context withinContext) throws Exception { + super(event, withinContext); AlterPartitionMessage apm = deserializer.getAlterPartitionMessage(event.getMessage()); tableObject = apm.getTableObj(); org.apache.hadoop.hive.metastore.api.Partition before = apm.getPtnObjBefore(); @@ -57,13 +62,19 @@ DUMPTYPE dumpType() { DUMPTYPE dumpType() { return DUMPTYPE.EVENT_RENAME_PARTITION; } + }, + TRUNCATE { + @Override + DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_TRUNCATE_PARTITION; + } }; abstract DUMPTYPE dumpType(); } - private static Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition before, - org.apache.hadoop.hive.metastore.api.Partition after) { + private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition before, + org.apache.hadoop.hive.metastore.api.Partition after) throws SemanticException { Iterator beforeValIter = before.getValuesIterator(); Iterator afterValIter = after.getValuesIterator(); while(beforeValIter.hasNext()) { @@ -71,14 +82,29 @@ private static Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partit return Scenario.RENAME; } } - return Scenario.ALTER; + return isTruncateOp(after) ? Scenario.TRUNCATE : Scenario.ALTER; + } + + private boolean isTruncateOp(org.apache.hadoop.hive.metastore.api.Partition after) throws SemanticException { + try { + Path location = new Path(after.getSd().getLocation()); + FileSystem fs = location.getFileSystem(withinContext.hiveConf); + FileStatus[] statuses = fs.listStatus(location, FileUtils.HIDDEN_FILES_PATH_FILTER); + if ((statuses == null) || (statuses.length == 0)) { + return true; + } + } catch (Exception e) { + throw new SemanticException(ErrorMsg.IO_ERROR.getMsg(), e); + } + + return false; } @Override - public void handle(Context withinContext) throws Exception { + public void handle() throws Exception { LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage()); - if (Scenario.ALTER == scenario) { + if (Scenario.RENAME != scenario) { withinContext.replicationSpec.setIsMetadataOnly(true); Table qlMdTable = new Table(tableObject); List qlPtns = new ArrayList<>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java index 04d9d79..fd63ea9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java @@ -17,11 +17,16 @@ */ package org.apache.hadoop.hive.ql.parse.repl.events; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; @@ -43,32 +48,55 @@ DUMPTYPE dumpType() { DUMPTYPE dumpType() { return DUMPTYPE.EVENT_RENAME_TABLE; } + }, + TRUNCATE { + @Override + DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_TRUNCATE_TABLE; + } }; abstract DUMPTYPE dumpType(); } - AlterTableHandler(NotificationEvent event) throws Exception { - super(event); + AlterTableHandler(NotificationEvent event, Context withinContext) throws Exception { + super(event, withinContext); AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage()); before = atm.getTableObjBefore(); after = atm.getTableObjAfter(); scenario = scenarioType(before, after); } - private static Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before, - org.apache.hadoop.hive.metastore.api.Table after) { - return before.getDbName().equals(after.getDbName()) - && before.getTableName().equals(after.getTableName()) - ? Scenario.ALTER - : Scenario.RENAME; + private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before, + org.apache.hadoop.hive.metastore.api.Table after) throws SemanticException { + if (before.getDbName().equals(after.getDbName()) + && before.getTableName().equals(after.getTableName())) { + return (isTruncateOp(after)) ? Scenario.TRUNCATE : Scenario.ALTER; + } else { + return Scenario.RENAME; + } + } + + private boolean isTruncateOp(org.apache.hadoop.hive.metastore.api.Table after) throws SemanticException { + try { + Path location = new Path(after.getSd().getLocation()); + FileSystem fs = location.getFileSystem(withinContext.hiveConf); + FileStatus[] statuses = fs.listStatus(location, FileUtils.HIDDEN_FILES_PATH_FILTER); + if ((statuses == null) || (statuses.length == 0)) { + return true; + } + } catch (Exception e) { + throw new SemanticException(ErrorMsg.IO_ERROR.getMsg(), e); + } + + return false; } @Override - public void handle(Context withinContext) throws Exception { + public void handle() throws Exception { { LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage()); - if (Scenario.ALTER == scenario) { + if (Scenario.RENAME != scenario) { withinContext.replicationSpec.setIsMetadataOnly(true); Table qlMdTableAfter = new Table(after); Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java index 03f400d..6e140b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java @@ -32,12 +32,12 @@ public class CreateTableHandler extends AbstractHandler { - CreateTableHandler(NotificationEvent event) { - super(event); + CreateTableHandler(NotificationEvent event, Context withinContext) { + super(event, withinContext); } @Override - public void handle(Context withinContext) throws Exception { + public void handle() throws Exception { CreateTableMessage ctm = deserializer.getCreateTableMessage(event.getMessage()); LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), event.getMessage()); org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj(); @@ -64,7 +64,7 @@ public void handle(Context withinContext) throws Exception { Iterable files = ctm.getFiles(); if (files != null) { // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { + try (BufferedWriter fileListWriter = writer(dataPath)) { for (String file : files) { fileListWriter.write(file + "\n"); } @@ -73,7 +73,7 @@ public void handle(Context withinContext) throws Exception { withinContext.createDmd(this).write(); } - private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { + private BufferedWriter writer(Path dataPath) throws IOException { FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java index 61c5f37..0379f78 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java @@ -24,12 +24,12 @@ public class DefaultHandler extends AbstractHandler { - DefaultHandler(NotificationEvent event) { - super(event); + DefaultHandler(NotificationEvent event, Context withinContext) { + super(event, withinContext); } @Override - public void handle(Context withinContext) throws Exception { + public void handle() throws Exception { LOG.info("Dummy processing#{} message : {}", fromEventId(), event.getMessage()); DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(event.getMessage()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java index 3ad794e..57dcd6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java @@ -24,12 +24,12 @@ public class DropPartitionHandler extends AbstractHandler { - DropPartitionHandler(NotificationEvent event) { - super(event); + DropPartitionHandler(NotificationEvent event, Context withinContext) { + super(event, withinContext); } @Override - public void handle(Context withinContext) throws Exception { + public void handle() throws Exception { LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), event.getMessage()); DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(event.getMessage()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java index cae379b..c6f9928 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java @@ -24,12 +24,12 @@ public class DropTableHandler extends AbstractHandler { - DropTableHandler(NotificationEvent event) { - super(event); + DropTableHandler(NotificationEvent event, Context withinContext) { + super(event, withinContext); } @Override - public void handle(Context withinContext) throws Exception { + public void handle() throws Exception { LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage()); DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(event.getMessage()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java index 199145a..f56301b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java @@ -26,7 +26,7 @@ import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; public interface EventHandler { - void handle(Context withinContext) throws Exception; + void handle() throws Exception; long fromEventId(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java index 53adea8..7c260d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java @@ -45,7 +45,7 @@ private EventHandlerFactory() { static void register(String event, Class handlerClazz) { try { Constructor constructor = - handlerClazz.getDeclaredConstructor(NotificationEvent.class); + handlerClazz.getDeclaredConstructor(NotificationEvent.class, EventHandler.Context.class); assert constructor != null; assert !Modifier.isPrivate(constructor.getModifiers()); registeredHandlers.put(event, handlerClazz); @@ -56,13 +56,13 @@ static void register(String event, Class handlerClazz) { } } - public static EventHandler handlerFor(NotificationEvent event) { + public static EventHandler handlerFor(NotificationEvent event, EventHandler.Context withinContext) { if (registeredHandlers.containsKey(event.getEventType())) { Class handlerClazz = registeredHandlers.get(event.getEventType()); try { Constructor constructor = - handlerClazz.getDeclaredConstructor(NotificationEvent.class); - return constructor.newInstance(event); + handlerClazz.getDeclaredConstructor(NotificationEvent.class, EventHandler.Context.class); + return constructor.newInstance(event, withinContext); } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { // this should never happen. however we want to make sure we propagate the exception throw new RuntimeException( @@ -70,6 +70,6 @@ public static EventHandler handlerFor(NotificationEvent event) { + " with the responsible class being " + handlerClazz.getCanonicalName(), e); } } - return new DefaultHandler(event); + return new DefaultHandler(event, withinContext); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java index 1346276..241fc4b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java @@ -37,12 +37,12 @@ public class InsertHandler extends AbstractHandler { - InsertHandler(NotificationEvent event) { - super(event); + InsertHandler(NotificationEvent event, Context withinContext) { + super(event, withinContext); } @Override - public void handle(Context withinContext) throws Exception { + public void handle() throws Exception { InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage()); org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(withinContext, insertMsg); Map partSpec = insertMsg.getPartitionKeyValues(); @@ -60,7 +60,7 @@ public void handle(Context withinContext) throws Exception { if (files != null) { // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext)) { + try (BufferedWriter fileListWriter = writer()) { for (String file : files) { fileListWriter.write(file + "\n"); } @@ -82,7 +82,7 @@ public void handle(Context withinContext) throws Exception { ); } - private BufferedWriter writer(Context withinContext) throws IOException { + private BufferedWriter writer() throws IOException { Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java index 0526700..b554283 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java @@ -11,7 +11,7 @@ public void shouldNotAllowRegisteringEventsWhichCannotBeInstantiated() { class NonCompatibleEventHandler implements EventHandler { @Override - public void handle(Context withinContext) throws Exception { + public void handle() throws Exception { } @@ -37,7 +37,7 @@ public long toEventId() { public void shouldProvideDefaultHandlerWhenNothingRegisteredForThatEvent() { EventHandler eventHandler = EventHandlerFactory.handlerFor(new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE, - "shouldGiveDefaultHandler", "s")); + "shouldGiveDefaultHandler", "s"), null); assertTrue(eventHandler instanceof DefaultHandler); } diff --git a/ql/src/test/results/clientpositive/columnStatsUpdateForStatsOptimizer_2.q.out b/ql/src/test/results/clientpositive/columnStatsUpdateForStatsOptimizer_2.q.out index af21343..a7c9b3f 100644 --- a/ql/src/test/results/clientpositive/columnStatsUpdateForStatsOptimizer_2.q.out +++ b/ql/src/test/results/clientpositive/columnStatsUpdateForStatsOptimizer_2.q.out @@ -292,10 +292,10 @@ Table Type: MANAGED_TABLE Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} #### A masked pattern was here #### - numFiles 2 + numFiles 0 numRows 0 rawDataSize 0 - totalSize 547 + totalSize 0 #### A masked pattern was here #### # Storage Information @@ -322,11 +322,11 @@ STAGE PLANS: Map Operator Tree: TableScan alias: calendar - Statistics: Num rows: 136 Data size: 547 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Select Operator expressions: month (type: int) outputColumnNames: month - Statistics: Num rows: 136 Data size: 547 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Group By Operator aggregations: max(month) mode: hash