diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 4efc81d..6448743 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -867,6 +867,10 @@ public void testIncrementalInserts() throws IOException { + ".ptned WHERE b=2"); verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2); + String[] unptn_data_after_ins = new String[] { "eleven", "twelve", "thirteen"}; + run("INSERT INTO TABLE " + dbName + ".unptned_late values(\'"+ unptn_data_after_ins[2]+"\')"); + verifySetup("SELECT a from " + dbName + ".unptned_late", unptn_data_after_ins); + advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId); incrementalDumpLocn = getResult(0, 0); @@ -878,6 +882,7 @@ public void testIncrementalInserts() throws IOException { verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1); verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data_after_ins); } @Test diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 7bb48a9..c398792 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -949,7 +949,8 @@ private static void createReplImportTasks( return; // silently return, table is newer than our replacement. } if (!replicationSpec.isMetadataOnly()) { - loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x); // repl-imports are replace-into + // repl-imports are replace-into unless the event is insert-into + loadTable(fromURI, table, !replicationSpec.isInsert(), new Path(fromURI), replicationSpec, x); } else { x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index cb1371c..5af008d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -622,6 +622,7 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition qlPtns = Arrays.asList(db.getPartition(qlMdTable, partSpec, false)); } Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); + replicationSpec.setIsInsert(true); // Mark the replication type as insert into to avoid overwrite while import EximUtil.createExportDump(metaDataPath.getFileSystem(conf), metaDataPath, qlMdTable, qlPtns, replicationSpec); Iterable files = insertMsg.getFiles(); @@ -1156,9 +1157,11 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { case EVENT_INSERT: { md = MessageFactory.getInstance().getDeserializer(); InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload()); + String actualDbName = ((dbName == null) || dbName.isEmpty() ? insertMessage.getDB() : dbName); + String actualTblName = ((tblName == null) || tblName.isEmpty() ? insertMessage.getTable() : tblName); + // Piggybacking in Import logic for now - return analyzeTableLoad( - insertMessage.getDB(), insertMessage.getTable(), locn, precursor, dbsUpdated, tablesUpdated); + return analyzeTableLoad(actualDbName, actualTblName, locn, precursor, dbsUpdated, tablesUpdated); } case EVENT_UNKNOWN: { break; @@ -1395,7 +1398,7 @@ private ReplicationSpec getNewReplicationSpec() throws SemanticException { // Use for specifying object state as well as event state private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException { - return new ReplicationSpec(true, false, evState, objState, false, true); + return new ReplicationSpec(true, false, evState, objState, false, true, false); } // Use for replication states focussed on event only, where the obj state will be the event state diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index be17ffa..48362a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -44,7 +44,7 @@ private String currStateId = null; private boolean isNoop = false; private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in. - + private boolean isInsert = false; // default is that the import mode is replace-into // Key definitions related to replication public enum KEY { @@ -53,6 +53,7 @@ CURR_STATE_ID("repl.last.id"), NOOP("repl.noop"), LAZY("repl.lazy"), + IS_INSERT("repl.is.insert") ; private final String keyName; @@ -134,13 +135,15 @@ public ReplicationSpec(){ } public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, - String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isLazy) { + String eventReplicationState, String currentReplicationState, + boolean isNoop, boolean isLazy, boolean isInsert) { this.isInReplicationScope = isInReplicationScope; this.isMetadataOnly = isMetadataOnly; this.eventId = eventReplicationState; this.currStateId = currentReplicationState; this.isNoop = isNoop; this.isLazy = isLazy; + this.isInsert = isInsert; } public ReplicationSpec(Function keyFetcher) { @@ -159,6 +162,7 @@ public ReplicationSpec(Function keyFetcher) { this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString()); this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString())); this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString())); + this.isInsert = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_INSERT.toString())); } /** @@ -292,6 +296,15 @@ public void setIsMetadataOnly(boolean isMetadataOnly){ } /** + * @return true if this statement refers to insert-into operation. + */ + public boolean isInsert(){ return isInsert; } + + public void setIsInsert(boolean isInsert){ + this.isInsert = isInsert; + } + + /** * @return the replication state of the event that spawned this statement */ public String getReplicationState() { @@ -357,6 +370,8 @@ public String get(KEY key) { return String.valueOf(isNoop()); case LAZY: return String.valueOf(isLazy()); + case IS_INSERT: + return String.valueOf(isInsert()); } return null; }