diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExport.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java similarity index 63% rename from itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExport.java rename to itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java index 1c7c844..1f19dfd 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExport.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java @@ -31,14 +31,16 @@ import java.io.IOException; -public class TestExport { +public class TestExportImport { - protected static final Logger LOG = LoggerFactory.getLogger(TestExport.class); - private static WarehouseInstance hiveWarehouse; + protected static final Logger LOG = LoggerFactory.getLogger(TestExportImport.class); + private static WarehouseInstance srcHiveWarehouse; + private static WarehouseInstance destHiveWarehouse; @Rule public final TestName testName = new TestName(); private String dbName; + private String replDbName; @BeforeClass public static void classLevelSetup() throws Exception { @@ -46,18 +48,22 @@ public static void classLevelSetup() throws Exception { conf.set("dfs.client.use.datanode.hostname", "true"); MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - hiveWarehouse = new WarehouseInstance(LOG, miniDFSCluster, false); + srcHiveWarehouse = new WarehouseInstance(LOG, miniDFSCluster, false); + destHiveWarehouse = new WarehouseInstance(LOG, miniDFSCluster, false); } @AfterClass public static void classLevelTearDown() throws IOException { - hiveWarehouse.close(); + srcHiveWarehouse.close(); + destHiveWarehouse.close(); } @Before public void setup() throws Throwable { dbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); - hiveWarehouse.run("create database " + dbName); + replDbName = dbName + "_dupe"; + srcHiveWarehouse.run("create database " + dbName); + destHiveWarehouse.run("create database " + replDbName); } @Test @@ -65,7 +71,7 @@ public void shouldExportImportATemporaryTable() throws Throwable { String path = "hdfs:///tmp/" + dbName + "/"; String exportPath = "'" + path + "'"; String importDataPath = path + "/data"; - hiveWarehouse + srcHiveWarehouse .run("use " + dbName) .run("create temporary table t1 (i int)") .run("insert into table t1 values (1),(2)") @@ -75,4 +81,20 @@ public void shouldExportImportATemporaryTable() throws Throwable { .run("select * from t2") .verifyResults(new String[] { "1", "2" }); } + + @Test + public void dataImportAfterMetadataOnlyImport() throws Throwable { + String path = "hdfs:///tmp/" + dbName + "/"; + String exportMDPath = "'" + path + "1/'"; + String exportDataPath = "'" + path + "2/'"; + srcHiveWarehouse.run("create table " + dbName + ".t1 (i int)") + .run("insert into table " + dbName + ".t1 values (1),(2)") + .run("export table " + dbName + ".t1 to " + exportMDPath + " for metadata replication('1')") + .run("export table " + dbName + ".t1 to " + exportDataPath + " for replication('2')"); + + destHiveWarehouse.run("import table " + replDbName + ".t1 from " + exportMDPath) + .run("import table " + replDbName + ".t1 from " + exportDataPath) + .run("select * from " + replDbName + ".t1") + .verifyResults(new String[] { "1", "2" }); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index d3af0ed..3ebd3cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -170,7 +170,7 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Ex private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException { ReplicationSpec rspec = getNewReplicationSpec(eventId.toString(), eventId.toString()); - rspec.setIsIncrementalDump(true); + rspec.setReplSpecType(ReplicationSpec.Type.INCREMENTAL_DUMP); return rspec; } @@ -260,8 +260,7 @@ private ReplicationSpec getNewReplicationSpec() throws TException { } private ReplicationSpec getNewReplicationSpec(String evState, String objState) { - return new ReplicationSpec(true, false, false, evState, objState, - false, true, true); + return new ReplicationSpec(true, false, evState, objState, false, true, true); } private String getNextDumpDir() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index 74144ac..b8c6ea9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -46,6 +46,14 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } else { replicationSpec = new ReplicationSpec(); } + if (replicationSpec.getCurrentReplicationState() == null) { + try { + long currentEventId = db.getMSC().getCurrentNotificationEventId().getEventId(); + replicationSpec.setCurrentReplicationState(String.valueOf(currentEventId)); + } catch (Exception e) { + throw new SemanticException("Error when getting current notification event ID", e); + } + } // initialize source table/partition TableSpec ts; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 606a414..f8d5c8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -139,7 +139,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } // parsing statement is now done, on to logic. - tableExists = prepareImport( + tableExists = prepareImport(true, isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor, parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(), new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx), @@ -175,7 +175,7 @@ private void parsePartitionSpec(ASTNode tableNode, LinkedHashMap } } - public static boolean prepareImport( + public static boolean prepareImport(boolean isImportCmd, boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnPrecursor, String parsedLocation, String parsedTableName, String parsedDbName, LinkedHashMap parsedPartSpec, @@ -205,6 +205,10 @@ public static boolean prepareImport( return false; } + if (isImportCmd) { + replicationSpec.setReplSpecType(ReplicationSpec.Type.IMPORT); + } + String dbname = SessionState.get().getCurrentDatabase(); if ((parsedDbName !=null) && (!parsedDbName.isEmpty())){ // If the parsed statement contained a db.tablename specification, prefer that. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 235a44c..f257933 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -38,12 +38,12 @@ private boolean isInReplicationScope = false; // default is that it's not in a repl scope private boolean isMetadataOnly = false; // default is full export/import, not metadata-only - private boolean isIncrementalDump = false; // default is replv2 bootstrap dump or replv1 export or import/load. private String eventId = null; private String currStateId = null; private boolean isNoop = false; private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in. private boolean isReplace = true; // default is that the import mode is insert overwrite + private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT // Key definitions related to replication public enum KEY { @@ -66,9 +66,9 @@ public String toString(){ } } - public enum SCOPE { NO_REPL, MD_ONLY, REPL }; + public enum SCOPE { NO_REPL, MD_ONLY, REPL } - static private Collator collator = Collator.getInstance(); + public enum Type { DEFAULT, INCREMENTAL_DUMP, IMPORT } /** * Constructor to construct spec based on either the ASTNode that @@ -106,28 +106,27 @@ public ReplicationSpec(){ } public ReplicationSpec(String fromId, String toId) { - this(true, false, false, fromId, toId, false, true, false); + this(true, false, fromId, toId, false, true, false); } public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, - boolean isIncrementalDump, String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isLazy, boolean isReplace) { this.isInReplicationScope = isInReplicationScope; this.isMetadataOnly = isMetadataOnly; - this.isIncrementalDump = isIncrementalDump; this.eventId = eventReplicationState; this.currStateId = currentReplicationState; this.isNoop = isNoop; this.isLazy = isLazy; this.isReplace = isReplace; + this.specType = Type.DEFAULT; } public ReplicationSpec(Function keyFetcher) { String scope = keyFetcher.apply(ReplicationSpec.KEY.REPL_SCOPE.toString()); this.isInReplicationScope = false; this.isMetadataOnly = false; - this.isIncrementalDump = false; + this.specType = Type.DEFAULT; if (scope != null) { if (scope.equalsIgnoreCase("metadata")) { this.isMetadataOnly = true; @@ -155,7 +154,7 @@ public static boolean isApplicable(ASTNode node){ * @param replacementReplState Replacement-candidate state * @return whether or not a provided replacement candidate is newer(or equal) to the existing object state or not */ - public static boolean allowReplacement(String currReplState, String replacementReplState){ + public boolean allowReplacement(String currReplState, String replacementReplState){ if ((currReplState == null) || (currReplState.isEmpty())) { // if we have no replication state on record for the obj, allow replacement. return true; @@ -171,7 +170,16 @@ public static boolean allowReplacement(String currReplState, String replacementR long currReplStateLong = Long.parseLong(currReplState.replaceAll("\\D","")); long replacementReplStateLong = Long.parseLong(replacementReplState.replaceAll("\\D","")); - return ((currReplStateLong - replacementReplStateLong) < 0); + // Failure handling of IMPORT command and REPL LOAD commands are different. + // IMPORT will set the last repl ID before copying data files and hence need to allow + // replacement if loaded from same dump twice after failing to copy in previous attempt. + // But, REPL LOAD will set the last repl ID only after the successful copy of data files and + // hence need not allow if same event is applied twice. + if (specType == Type.IMPORT) { + return (currReplStateLong <= replacementReplStateLong); + } else { + return (currReplStateLong < replacementReplStateLong); + } } /** @@ -211,9 +219,17 @@ private void init(ASTNode node){ // -> ^(TOK_REPLICATION $replId $isMetadataOnly) isInReplicationScope = true; eventId = PlanUtils.stripQuotes(node.getChild(0).getText()); - if (node.getChildCount() > 1){ - if (node.getChild(1).getText().toLowerCase().equals("metadata")) { - isMetadataOnly= true; + if ((node.getChildCount() > 1) + && node.getChild(1).getText().toLowerCase().equals("metadata")) { + isMetadataOnly= true; + try { + if (Long.parseLong(eventId) >= 0) { + // If metadata-only dump, then the state of the dump shouldn't be the latest event id as + // the data is not yet dumped and shall be dumped in future export. + currStateId = eventId; + } + } catch (Exception ex) { + // Ignore the exception and fall through the default currentStateId } } } @@ -226,21 +242,21 @@ public static String getLastReplicatedStateFromParameters(Map m) } /** - * @return true if this statement is being run for the purposes of replication + * @return true if this statement refers to incremental dump operation. */ - public boolean isInReplicationScope(){ - return isInReplicationScope; + public Type getReplSpecType(){ + return this.specType; } - /** - * @return true if this statement refers to incremental dump operation. - */ - public boolean isIncrementalDump(){ - return isIncrementalDump; + public void setReplSpecType(Type specType){ + this.specType = specType; } - public void setIsIncrementalDump(boolean isIncrementalDump){ - this.isIncrementalDump = isIncrementalDump; + /** + * @return true if this statement is being run for the purposes of replication + */ + public boolean isInReplicationScope(){ + return isInReplicationScope; } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java index 6a05ea4..8b43110 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java @@ -38,7 +38,6 @@ public ReplicationSpec fromMetaStore() throws HiveException { new ReplicationSpec( true, false, - false, "replv2", "will-be-set", false, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 5eae35a..7533a39 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -93,8 +93,6 @@ public AuthEntities write() throws SemanticException { private PartitionIterable partitions() throws SemanticException { try { - long currentEventId = db.getMSC().getCurrentNotificationEventId().getEventId(); - replicationSpec.setCurrentReplicationState(String.valueOf(currentEventId)); if (tableSpec.tableHandle.isPartitioned()) { if (tableSpec.specType == TableSpec.SpecType.TABLE_ONLY) { // TABLE-ONLY, fetch partitions if regular export, don't if metadata-only diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java index 2c7414f..ce83523 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java @@ -44,7 +44,8 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi if (additionalPropertiesProvider.isInReplicationScope()) { // Current replication state must be set on the Partition object only for bootstrap dump. // Event replication State will be null in case of bootstrap dump. - if (!additionalPropertiesProvider.isIncrementalDump()) { + if (additionalPropertiesProvider.getReplSpecType() + != ReplicationSpec.Type.INCREMENTAL_DUMP) { partition.putToParameters( ReplicationSpec.KEY.CURR_STATE_ID.toString(), additionalPropertiesProvider.getCurrentReplicationState()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index c443e53..92408a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -70,7 +70,8 @@ private Table addPropertiesToTable(Table table, ReplicationSpec additionalProper if (additionalPropertiesProvider.isInReplicationScope()) { // Current replication state must be set on the Table object only for bootstrap dump. // Event replication State will be null in case of bootstrap dump. - if (!additionalPropertiesProvider.isIncrementalDump()) { + if (additionalPropertiesProvider.getReplSpecType() + != ReplicationSpec.Type.INCREMENTAL_DUMP) { table.putToParameters( ReplicationSpec.KEY.CURR_STATE_ID.toString(), additionalPropertiesProvider.getCurrentReplicationState()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index 2c5c2d9..4ba6256 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -45,7 +45,7 @@ // REPL LOAD is not partition level. It is always DB or table level. So, passing null for partition specs. // Also, REPL LOAD doesn't support external table and hence no location set as well. - ImportSemanticAnalyzer.prepareImport(false, false, false, + ImportSemanticAnalyzer.prepareImport(false, false, false, false, (context.precursor != null), null, context.tableName, context.dbName, null, context.location, x, updatedMetadata);