diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index c9092b1..2c98133 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -1013,6 +1013,132 @@ public void testViewsReplication() throws IOException { } @Test + public void testTruncateTable() throws IOException { + String testName = "truncateTable"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + String[] unptn_data = new String[] { "eleven", "twelve" }; + String[] empty = new String[] {}; + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data); + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data); + + run("TRUNCATE TABLE " + dbName + ".unptned"); + verifySetup("SELECT a from " + dbName + ".unptned", empty); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned", empty); + verifyRun("SELECT a from " + dbName + "_dupe.unptned", empty); + + String[] unptn_data_after_ins = new String[] { "thirteen" }; + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_after_ins[0] + "')"); + verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins); + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_after_ins); + } + + @Test + public void testTruncatePartitionedTable() throws IOException { + String testName = "truncatePartitionedTable"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".ptned_1(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned_2(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + + String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" }; + String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" }; + String[] empty = new String[] {}; + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[2] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[2] + "')"); + + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[2] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[2] + "')"); + + verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=10) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=20) ORDER BY a", ptn_data_2); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20) ORDER BY a", ptn_data_2); + + run("TRUNCATE TABLE " + dbName + ".ptned_1 PARTITION(b=2)"); + verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=2)", empty); + + run("TRUNCATE TABLE " + dbName + ".ptned_2"); + verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=10)", empty); + verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=20)", empty); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1); + verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2)", empty); + verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10)", empty); + verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20)", empty); + } + + @Test public void testStatus() throws IOException { // first test ReplStateMap functionality Map cmap = new ReplStateMap(); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java index b0defb5..868e5a5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java @@ -75,19 +75,17 @@ public void onDropTable (DropTableEvent tableEvent) throws MetaException { } /** - * @param add partition event - * @throws MetaException - */ - - /** * @param tableEvent alter table event * @throws MetaException */ public void onAlterTable (AlterTableEvent tableEvent) throws MetaException { } - public void onAddPartition (AddPartitionEvent partitionEvent) - throws MetaException { + /** + * @param partitionEvent add partition event + * @throws MetaException + */ + public void onAddPartition (AddPartitionEvent partitionEvent) throws MetaException { } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index f137819..43294c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4677,6 +4677,9 @@ private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws H } } } + + // Update the table stats using alterTable/Partition operations + updateTableStats(db, table, partSpec); } catch (Exception e) { throw new HiveException(e, ErrorMsg.GENERIC_ERROR); } @@ -4714,39 +4717,49 @@ private int exchangeTablePartition(Hive db, if (table.isPartitioned()) { for (Partition partition : db.getPartitions(table)) { locations.add(partition.getDataLocation()); - EnvironmentContext environmentContext = new EnvironmentContext(); - if (needToUpdateStats(partition.getParameters(), environmentContext)) { - db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext); - } } } else { locations.add(table.getPath()); - EnvironmentContext environmentContext = new EnvironmentContext(); - if (needToUpdateStats(table.getParameters(), environmentContext)) { - db.alterTable(table.getDbName()+"."+table.getTableName(), table, environmentContext); - } } } else { for (Partition partition : db.getPartitionsByNames(table, partSpec)) { locations.add(partition.getDataLocation()); - EnvironmentContext environmentContext = new EnvironmentContext(); - if (needToUpdateStats(partition.getParameters(), environmentContext)) { + } + } + return locations; + } + + private void updateTableStats(Hive db, Table table, Map partSpec) + throws HiveException, InvalidOperationException { + if (partSpec == null) { + if (table.isPartitioned()) { + for (Partition partition : db.getPartitions(table)) { + EnvironmentContext environmentContext = new EnvironmentContext(); + updateStatsForTruncate(partition.getParameters(), environmentContext); db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext); } + } else { + EnvironmentContext environmentContext = new EnvironmentContext(); + updateStatsForTruncate(table.getParameters(), environmentContext); + db.alterTable(table.getDbName()+"."+table.getTableName(), table, environmentContext); + } + } else { + for (Partition partition : db.getPartitionsByNames(table, partSpec)) { + EnvironmentContext environmentContext = new EnvironmentContext(); + updateStatsForTruncate(partition.getParameters(), environmentContext); + db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext); } } - return locations; + return; } - private boolean needToUpdateStats(Map props, EnvironmentContext environmentContext) { + private void updateStatsForTruncate(Map props, EnvironmentContext environmentContext) { if (null == props) { - return false; + return; } - boolean statsPresent = false; for (String stat : StatsSetupConst.supportedStats) { String statVal = props.get(stat); - if (statVal != null && Long.parseLong(statVal) > 0) { - statsPresent = true; + if (statVal != null) { //In the case of truncate table, we set the stats to be 0. props.put(stat, "0"); } @@ -4756,7 +4769,7 @@ private boolean needToUpdateStats(Map props, EnvironmentContext e environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK); //then invalidate column stats StatsSetupConst.clearColumnStatsState(props); - return statsPresent; + return; } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 245c483..ea00f29 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -505,7 +505,7 @@ private static void checkTable(Table table, ImportTableDesc tableDesc, Replicati } } - // Next, we verify that the destination table is not offline, a view, or a non-native table + // Next, we verify that the destination table is not offline, or a non-native table EximUtil.validateTable(table); // If the import statement specified that we're importing to an external diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 011df19..9d4c86c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.ReplChangeManager; @@ -62,6 +63,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc; +import org.apache.hadoop.hive.ql.plan.TruncateTableDesc; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.IOUtils; @@ -113,8 +115,10 @@ EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"), EVENT_ALTER_TABLE("EVENT_ALTER_TABLE"), EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"), + EVENT_TRUNCATE_TABLE("EVENT_TRUNCATE_TABLE"), EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"), EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"), + EVENT_TRUNCATE_PARTITION("EVENT_TRUNCATE_PARTITION"), EVENT_INSERT("EVENT_INSERT"), EVENT_UNKNOWN("EVENT_UNKNOWN"); @@ -547,30 +551,35 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition AlterTableMessage atm = md.getAlterTableMessage(ev.getMessage()); org.apache.hadoop.hive.metastore.api.Table tobjBefore = atm.getTableObjBefore(); org.apache.hadoop.hive.metastore.api.Table tobjAfter = atm.getTableObjAfter(); + DUMPTYPE dumpType = DUMPTYPE.EVENT_ALTER_TABLE; if (tobjBefore.getDbName().equals(tobjAfter.getDbName()) && - tobjBefore.getTableName().equals(tobjAfter.getTableName())){ - // regular alter scenario - replicationSpec.setIsMetadataOnly(true); + tobjBefore.getTableName().equals(tobjAfter.getTableName())) { Table qlMdTableAfter = new Table(tobjAfter); Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); + + // If truncate operation, then need to remove the data files too. + if (isTruncateOp(qlMdTableAfter.getPath())) { + dumpType = DUMPTYPE.EVENT_TRUNCATE_TABLE; + } + + // regular alter scenario. Even truncate table will perform a metadata only alter and then truncate. + replicationSpec.setIsMetadataOnly(true); + EximUtil.createExportDump( metaDataPath.getFileSystem(conf), metaDataPath, qlMdTableAfter, null, replicationSpec); - - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_TABLE, evid, evid, cmRoot); - dmd.setPayload(ev.getMessage()); - dmd.write(); } else { // rename scenario - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_TABLE, evid, evid, cmRoot); - dmd.setPayload(ev.getMessage()); - dmd.write(); + dumpType = DUMPTYPE.EVENT_RENAME_TABLE; } + DumpMetaData dmd = new DumpMetaData(evRoot, dumpType, evid, evid, cmRoot); + dmd.setPayload(ev.getMessage()); + dmd.write(); break; } case MessageFactory.ALTER_PARTITION_EVENT : { @@ -579,6 +588,7 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition org.apache.hadoop.hive.metastore.api.Table tblObj = apm.getTableObj(); org.apache.hadoop.hive.metastore.api.Partition pobjBefore = apm.getPtnObjBefore(); org.apache.hadoop.hive.metastore.api.Partition pobjAfter = apm.getPtnObjAfter(); + DUMPTYPE dumpType = DUMPTYPE.EVENT_ALTER_PARTITION; boolean renameScenario = false; Iterator beforeValIter = pobjBefore.getValuesIterator(); @@ -591,29 +601,34 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition } if (!renameScenario){ - // regular partition alter - replicationSpec.setIsMetadataOnly(true); Table qlMdTable = new Table(tblObj); List qlPtns = new ArrayList(); qlPtns.add(new Partition(qlMdTable, pobjAfter)); Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); + + // If truncate operation, then need to remove the data files too. + if (isTruncateOp(qlPtns.get(0).getDataLocation())) { + dumpType = DUMPTYPE.EVENT_TRUNCATE_PARTITION; + } + + // regular partition alter. Even truncate partition will perform a metadata only alter and then truncate. + replicationSpec.setIsMetadataOnly(true); + EximUtil.createExportDump( metaDataPath.getFileSystem(conf), metaDataPath, qlMdTable, qlPtns, replicationSpec); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_PARTITION, evid, evid, cmRoot); - dmd.setPayload(ev.getMessage()); - dmd.write(); - break; } else { // rename scenario - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_PARTITION, evid, evid, cmRoot); - dmd.setPayload(ev.getMessage()); - dmd.write(); - break; + dumpType = DUMPTYPE.EVENT_RENAME_PARTITION; } + + DumpMetaData dmd = new DumpMetaData(evRoot, dumpType, evid, evid, cmRoot); + dmd.setPayload(ev.getMessage()); + dmd.write(); + break; } case MessageFactory.INSERT_EVENT: { InsertMessage insertMsg = md.getInsertMessage(ev.getMessage()); @@ -1118,6 +1133,26 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } } } + case EVENT_TRUNCATE_TABLE: { + AlterTableMessage truncateTableMessage = md.getAlterTableMessage(dmd.getPayload()); + String actualDbName = ((dbName == null) || dbName.isEmpty() ? truncateTableMessage.getDB() : dbName); + String actualTblName = ((tblName == null) || tblName.isEmpty() ? truncateTableMessage.getTable() : tblName); + + // First perform alter table for just in case if current alter happen after truncate table + List> tasks = + analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); + + TruncateTableDesc truncateTableDesc = new TruncateTableDesc( + actualDbName + "." + actualTblName, null); + Task truncateTableTask = TaskFactory.get(new DDLWork(inputs, outputs, truncateTableDesc), conf); + if (precursor != null) { + precursor.addDependentTask(truncateTableTask); + } + tasks.add(truncateTableTask); + LOG.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), truncateTableDesc.getTableName()); + dbsUpdated.put(actualDbName,dmd.getEventTo()); + return tasks; + } case EVENT_ALTER_PARTITION: { return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); } @@ -1159,6 +1194,42 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { tablesUpdated.put(tableName, dmd.getEventTo()); return tasks; } + case EVENT_TRUNCATE_PARTITION: { + AlterPartitionMessage truncatePtnMessage = md.getAlterPartitionMessage(dmd.getPayload()); + String actualDbName = ((dbName == null) || dbName.isEmpty() ? truncatePtnMessage.getDB() : dbName); + String actualTblName = ((tblName == null) || tblName.isEmpty() ? truncatePtnMessage.getTable() : tblName); + + Map partSpec = new LinkedHashMap(); + try { + org.apache.hadoop.hive.metastore.api.Table tblObj = truncatePtnMessage.getTableObj(); + org.apache.hadoop.hive.metastore.api.Partition pobjAfter = truncatePtnMessage.getPtnObjAfter(); + Iterator afterValIter = pobjAfter.getValuesIterator(); + for (FieldSchema fs : tblObj.getPartitionKeys()){ + partSpec.put(fs.getName(), afterValIter.next()); + } + } catch (Exception e) { + if (!(e instanceof SemanticException)){ + throw new SemanticException("Error reading message members", e); + } else { + throw (SemanticException)e; + } + } + + // First perform alter partition for just in case if current alter happen after truncate partition + List> tasks = + analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); + + TruncateTableDesc truncateTableDesc = new TruncateTableDesc( + actualDbName + "." + actualTblName, partSpec); + Task truncatePtnTask = TaskFactory.get(new DDLWork(inputs, outputs, truncateTableDesc), conf); + if (precursor != null) { + precursor.addDependentTask(truncatePtnTask); + } + tasks.add(truncatePtnTask); + LOG.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), truncateTableDesc.getTableName()); + dbsUpdated.put(actualDbName,dmd.getEventTo()); + return tasks; + } case EVENT_INSERT: { md = MessageFactory.getInstance().getDeserializer(); InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload()); @@ -1214,6 +1285,20 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { return partSpecs; } + private boolean isTruncateOp(Path location) throws SemanticException { + try { + FileSystem fs = location.getFileSystem(conf); + FileStatus[] statuses = fs.listStatus(location, FileUtils.HIDDEN_FILES_PATH_FILTER); + if ((statuses == null) || (statuses.length == 0)) { + return true; + } + } catch (Exception e) { + throw new SemanticException(ErrorMsg.IO_ERROR.getMsg(), e); + } + + return false; + } + private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) throws SemanticException { try {