commit 8d3c4ee552ff5601f3a4a4800d59f0407fcf4e25 Author: Daniel Dai Date: Mon Aug 21 11:40:00 2017 -0700 HIVE-17366: Constraint replication in bootstrap diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index b020351..e0b5f86 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -2653,16 +2653,15 @@ public void testConstraints() throws IOException { LOG.info("Dumped to {} with id {}", replDumpLocn, replDumpId); run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - // bootstrap replication for constraint is not implemented. Will verify it works once done try { List pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(dbName+ "_dupe" , "tbl1")); - assertTrue(pks.isEmpty()); + assertEquals(pks.size(), 1); List uks = metaStoreClientMirror.getUniqueConstraints(new UniqueConstraintsRequest(dbName+ "_dupe" , "tbl1")); - assertTrue(uks.isEmpty()); + assertEquals(uks.size(), 1); List fks = metaStoreClientMirror.getForeignKeys(new ForeignKeysRequest(null, null, dbName+ "_dupe" , "tbl2")); - assertTrue(fks.isEmpty()); + assertEquals(fks.size(), 1); List nns = metaStoreClientMirror.getNotNullConstraints(new NotNullConstraintsRequest(dbName+ "_dupe" , "tbl3")); - assertTrue(nns.isEmpty()); + assertEquals(nns.size(), 1); } catch (TException te) { assertNull(te); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 34b6737..9130160 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -25,6 +25,10 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; @@ -46,6 +50,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.ConstraintsSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; @@ -61,6 +66,7 @@ Licensed to the Apache Software Foundation (ASF) under one public class ReplDumpTask extends Task implements Serializable { private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; + private static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; private static final String FUNCTION_METADATA_FILE_NAME = "_metadata"; private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class); @@ -184,6 +190,7 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); dumpTable(dbName, tblName, dbRoot); + dumpConstraintMetadata(dbName, tblName, dumpRoot); } } Long bootDumpEndReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId(); @@ -295,6 +302,22 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception } } + private void dumpConstraintMetadata(String dbName, String tblName, Path dumpRoot) throws Exception { + Path constraintsRoot = new Path(new Path(dumpRoot, dbName), CONSTRAINTS_ROOT_DIR_NAME); + Path constraintsFile = new Path(constraintsRoot, tblName); + List pks = getHive().getPrimaryKeyList(dbName, tblName); + List fks = getHive().getForeignKeyList(dbName, tblName); + List uks = getHive().getUniqueConstraintList(dbName, tblName); + List nns = getHive().getNotNullConstraintList(dbName, tblName); + if (!pks.isEmpty() || !fks.isEmpty() || !uks.isEmpty() || !nns.isEmpty()) { + try (JsonWriter jsonWriter = + new JsonWriter(constraintsFile.getFileSystem(conf), constraintsFile)) { + ConstraintsSerializer serializer = new ConstraintsSerializer(pks, fks, uks, nns, conf); + serializer.writeTo(jsonWriter, null); + } + } + } + private HiveWrapper.Tuple functionTuple(String functionName, String dbName) { try { HiveWrapper.Tuple tuple = new HiveWrapper(getHive(), dbName).function(functionName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 6ea1754..44b32f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -22,11 +22,14 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; @@ -72,6 +75,7 @@ protected int execute(DriverContext driverContext) { a database ( directory ) */ BootstrapEventsIterator iterator = work.iterator(); + ConstraintEventsIterator constraintIterator = work.constraintIterator(); /* This is used to get hold of a reference during the current creation of tasks and is initialized with "0" tasks such that it will be non consequential in any operations done with task tracker @@ -80,8 +84,17 @@ a database ( directory ) TaskTracker dbTracker = new TaskTracker(ZERO_TASKS); TaskTracker tableTracker = new TaskTracker(ZERO_TASKS); Scope scope = new Scope(); - while (iterator.hasNext() && loadTaskTracker.canAddMoreTasks()) { - BootstrapEvent next = iterator.next(); + boolean loadingConstraint = false; + if (!iterator.hasNext() && constraintIterator.hasNext()) { + loadingConstraint = true; + } + while ((iterator.hasNext() || loadingConstraint && constraintIterator.hasNext()) && loadTaskTracker.canAddMoreTasks()) { + BootstrapEvent next; + if (!loadingConstraint) { + next = iterator.next(); + } else { + next = constraintIterator.next(); + } switch (next.eventType()) { case Database: DatabaseEvent dbEvent = (DatabaseEvent) next; @@ -168,11 +181,20 @@ a database ( directory ) functionsTracker.debugLog("functions"); break; } + case Constraint: { + LoadConstraint loadConstraint = + new LoadConstraint(context, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker); + TaskTracker constraintTracker = loadConstraint.tasks(); + scope.rootTasks.addAll(constraintTracker.tasks()); + loadTaskTracker.update(constraintTracker); + constraintTracker.debugLog("constraints"); + } } } - boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState(); + boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState() + || constraintIterator.hasNext(); createBuilderTask(scope.rootTasks, addAnotherLoadTask); - if (!iterator.hasNext()) { + if (!iterator.hasNext() && !constraintIterator.hasNext()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); } this.childTasks = scope.rootTasks; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index eb18e5f..aceb73c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -20,6 +20,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; import org.apache.hadoop.hive.ql.plan.Explain; import java.io.IOException; @@ -32,6 +33,7 @@ Licensed to the Apache Software Foundation (ASF) under one final String dbNameToLoadIn; final String tableNameToLoadIn; private final BootstrapEventsIterator iterator; + private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; private DatabaseEvent.State state = null; @@ -39,6 +41,7 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoad String tableNameToLoadIn) throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; this.iterator = new BootstrapEventsIterator(dumpDirectory, hiveConf); + this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; } @@ -51,6 +54,10 @@ public BootstrapEventsIterator iterator() { return iterator; } + public ConstraintEventsIterator constraintIterator() { + return constraintsIterator; + } + int executedLoadTask() { return ++loadTaskRunCount; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java index db2b0ac..7b7aac9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java @@ -22,7 +22,7 @@ Licensed to the Apache Software Foundation (ASF) under one EventType eventType(); enum EventType { - Database, Table, Function, Partition + Database, Table, Function, Partition, Constraint } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/ConstraintEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/ConstraintEvent.java new file mode 100644 index 0000000..7429283 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/ConstraintEvent.java @@ -0,0 +1,24 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +import org.apache.hadoop.fs.Path; + +public interface ConstraintEvent extends BootstrapEvent { + Path rootDir(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java new file mode 100644 index 0000000..bacf158 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java @@ -0,0 +1,87 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; + +public class ConstraintEventsIterator implements Iterator { + private FileStatus[] dbDirs; + private int currentDbIndex; + private FileStatus[] constraintFiles = null; + private int currentConstraintIndex; + private FileSystem fs; + + public ConstraintEventsIterator(String dumpDirectory, HiveConf hiveConf) throws IOException { + Path path = new Path(dumpDirectory); + fs = path.getFileSystem(hiveConf); + dbDirs = fs.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fs)); + currentDbIndex = 0; + if (dbDirs.length != 0) { + currentConstraintIndex = 0; + constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath()); + } + } + + private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir) { + try { + return fs.listStatus(new Path(dbDirs[0].getPath(), ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME)); + } catch (FileNotFoundException e) { + return new FileStatus[]{}; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + if (constraintFiles != null && currentConstraintIndex < constraintFiles.length) { + return true; + } + while (constraintFiles != null && currentConstraintIndex == constraintFiles.length) { + currentDbIndex ++; + if (currentDbIndex < dbDirs.length) { + currentConstraintIndex = 0; + constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath()); + } else { + constraintFiles = null; + } + } + if (constraintFiles != null) { + return true; + } else { + return false; + } + } + + @Override + public FSConstraintEvent next() { + int thisIndex = currentConstraintIndex; + currentConstraintIndex++; + return new FSConstraintEvent(constraintFiles[thisIndex].getPath()); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSConstraintEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSConstraintEvent.java new file mode 100644 index 0000000..a2ad444 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSConstraintEvent.java @@ -0,0 +1,39 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; + +public class FSConstraintEvent implements ConstraintEvent { + private final Path rootDir; + + FSConstraintEvent(Path rootDir) { + this.rootDir = rootDir; + } + + @Override + public Path rootDir() { + return rootDir; + } + + @Override + public EventType eventType() { + return EventType.Constraint; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java new file mode 100644 index 0000000..fc2aa8d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java @@ -0,0 +1,119 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AddForeignKeyHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AddNotNullConstraintHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AddPrimaryKeyHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AddUniqueConstraintHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.stripQuotes; + +public class LoadConstraint { + private static final Logger LOG = LoggerFactory.getLogger(LoadFunction.class); + private Context context; + private final ConstraintEvent event; + private final String dbNameToLoadIn; + private final TaskTracker tracker; + + public LoadConstraint(Context context, ConstraintEvent event, String dbNameToLoadIn, + TaskTracker existingTracker) { + this.context = context; + this.event = event; + this.dbNameToLoadIn = dbNameToLoadIn; + this.tracker = new TaskTracker(existingTracker); + } + + public TaskTracker tasks() throws IOException, SemanticException { + URI fromURI = EximUtil + .getValidatedURI(context.hiveConf, stripQuotes(event.rootDir().toUri().toString())); + Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); + + try { + FileSystem fs = FileSystem.get(fromPath.toUri(), context.hiveConf); + JSONObject json = new JSONObject(EximUtil.readAsString(fs, fromPath)); + String pksString = json.getString("pks"); + String fksString = json.getString("fks"); + String uksString = json.getString("uks"); + String nnsString = json.getString("nns"); + List> tasks = new ArrayList>(); + + AddPrimaryKeyHandler pkHandler = new AddPrimaryKeyHandler(); + DumpMetaData pkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_PRIMARYKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + pkDumpMetaData.setPayload(pksString); + tasks.addAll(pkHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, pkDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + + AddForeignKeyHandler fkHandler = new AddForeignKeyHandler(); + DumpMetaData fkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_FOREIGNKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + fkDumpMetaData.setPayload(fksString); + tasks.addAll(fkHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, fkDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + + AddUniqueConstraintHandler ukHandler = new AddUniqueConstraintHandler(); + DumpMetaData ukDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_UNIQUECONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + ukDumpMetaData.setPayload(uksString); + tasks.addAll(ukHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, ukDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + + AddNotNullConstraintHandler nnHandler = new AddNotNullConstraintHandler(); + DumpMetaData nnDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_NOTNULLCONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + nnDumpMetaData.setPayload(nnsString); + tasks.addAll(nnHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, nnDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + + tasks.forEach(tracker::addTask); + return tracker; + } catch (Exception e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index d661f10..f4b95a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4208,6 +4208,38 @@ public void dropConstraint(String dbName, String tableName, String constraintNam } } + public List getPrimaryKeyList(String dbName, String tblName) throws HiveException { + try { + return getMSC().getPrimaryKeys(new PrimaryKeysRequest(dbName, tblName)); + } catch (Exception e) { + throw new HiveException(e); + } + } + + public List getForeignKeyList(String dbName, String tblName) throws HiveException { + try { + return getMSC().getForeignKeys(new ForeignKeysRequest(null, null, dbName, tblName)); + } catch (Exception e) { + throw new HiveException(e); + } + } + + public List getUniqueConstraintList(String dbName, String tblName) throws HiveException { + try { + return getMSC().getUniqueConstraints(new UniqueConstraintsRequest(dbName, tblName)); + } catch (Exception e) { + throw new HiveException(e); + } + } + + public List getNotNullConstraintList(String dbName, String tblName) throws HiveException { + try { + return getMSC().getNotNullConstraints(new NotNullConstraintsRequest(dbName, tblName)); + } catch (Exception e) { + throw new HiveException(e); + } + } + /** * Get all primary key columns associated with the table. * diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 22094c0..373321e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -272,7 +272,7 @@ public static MetaData readMetaData(FileSystem fs, Path metadataPath) } } - private static String readAsString(final FileSystem fs, final Path fromMetadataPath) + public static String readAsString(final FileSystem fs, final Path fromMetadataPath) throws IOException { try (FSDataInputStream stream = fs.open(fromMetadataPath)) { byte[] buffer = new byte[1024]; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index d4fc340..1eb3c01 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -73,6 +73,7 @@ Licensed to the Apache Software Foundation (ASF) under one private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; + public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java new file mode 100644 index 0000000..22de079 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.repl.dump.io; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +public class ConstraintsSerializer implements JsonWriter.Serializer { + private HiveConf hiveConf; + private List pks; + private List fks; + private List uks; + private List nns; + + public ConstraintsSerializer(List pks, List fks, + List uks, List nns, HiveConf hiveConf) { + this.hiveConf = hiveConf; + this.pks = pks; + this.fks = fks; + this.uks = uks; + this.nns = nns; + } + + @Override + public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) + throws SemanticException, IOException { + String pksString, fksString, uksString, nnsString; + pksString = fksString = uksString = nnsString = ""; + if (pks != null) { + pksString = MessageFactory.getInstance().buildAddPrimaryKeyMessage(pks).toString(); + } + if (fks != null) { + fksString = MessageFactory.getInstance().buildAddForeignKeyMessage(fks).toString(); + } + if (uks != null) { + uksString = MessageFactory.getInstance().buildAddUniqueConstraintMessage(uks).toString(); + } + if (uks != null) { + nnsString = MessageFactory.getInstance().buildAddNotNullConstraintMessage(nns).toString(); + } + writer.jsonGenerator.writeStringField("pks", pksString); + writer.jsonGenerator.writeStringField("fks", fksString); + writer.jsonGenerator.writeStringField("uks", uksString); + writer.jsonGenerator.writeStringField("nns", nnsString); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java index 39697bb..cf6feae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java @@ -49,6 +49,11 @@ } } + List> tasks = new ArrayList>(); + if (fks.isEmpty()) { + return tasks; + } + String actualDbName = context.isDbNameEmpty() ? fks.get(0).getFktable_db() : context.dbName; String actualTblName = context.isTableNameEmpty() ? fks.get(0).getPktable_name() : context.tableName; @@ -61,7 +66,6 @@ AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList(), fks, new ArrayList(), context.eventOnlyReplicationSpec()); Task addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); - List> tasks = new ArrayList>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); databasesUpdated.put(actualDbName, context.dmd.getEventTo()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java index e2c1d1d..1190ad0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; -import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -51,6 +50,11 @@ } } + List> tasks = new ArrayList>(); + if (nns.isEmpty()) { + return tasks; + } + String actualDbName = context.isDbNameEmpty() ? nns.get(0).getTable_db() : context.dbName; String actualTblName = context.isTableNameEmpty() ? nns.get(0).getTable_name() : context.tableName; @@ -62,7 +66,6 @@ AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList(), new ArrayList(), new ArrayList(), nns, context.eventOnlyReplicationSpec()); Task addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); - List> tasks = new ArrayList>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); databasesUpdated.put(actualDbName, context.dmd.getEventTo()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java index 7babb6a..4306c10 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java @@ -48,6 +48,12 @@ throw (SemanticException)e; } } + + List> tasks = new ArrayList>(); + if (pks.isEmpty()) { + return tasks; + } + String actualDbName = context.isDbNameEmpty() ? pks.get(0).getTable_db() : context.dbName; String actualTblName = context.isTableNameEmpty() ? pks.get(0).getTable_name() : context.tableName; @@ -59,7 +65,6 @@ AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, pks, new ArrayList(), new ArrayList(), context.eventOnlyReplicationSpec()); Task addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); - List> tasks = new ArrayList>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); databasesUpdated.put(actualDbName, context.dmd.getEventTo()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java index e7b404a..e31943c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java @@ -49,6 +49,11 @@ } } + List> tasks = new ArrayList>(); + if (uks.isEmpty()) { + return tasks; + } + String actualDbName = context.isDbNameEmpty() ? uks.get(0).getTable_db() : context.dbName; String actualTblName = context.isTableNameEmpty() ? uks.get(0).getTable_name() : context.tableName; @@ -60,7 +65,6 @@ AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList(), new ArrayList(), uks, context.eventOnlyReplicationSpec()); Task addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); - List> tasks = new ArrayList>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); databasesUpdated.put(actualDbName, context.dmd.getEventTo());