diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index eade36f..d08490f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -74,6 +74,21 @@ Licensed to the Apache Software Foundation (ASF) under one private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; private static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; private static final String FUNCTION_METADATA_FILE_NAME = "_metadata"; + public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_"); + private final String name; + private final String prefix; + private ConstraintFileType(String name, String prefix) { + this.name = name; + this.prefix = prefix; + } + public String getName() { + return this.name; + } + + public String getPrefix() { + return prefix; + } + } private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class); private ReplLogger replLogger; @@ -313,17 +328,25 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception private void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws Exception { try { Path constraintsRoot = new Path(dbRoot, CONSTRAINTS_ROOT_DIR_NAME); - Path constraintsFile = new Path(constraintsRoot, tblName); + Path commonConstraintsFile = new Path(constraintsRoot, ConstraintFileType.COMMON.getPrefix() + tblName); + Path fkConstraintsFile = new Path(constraintsRoot, ConstraintFileType.FOREIGNKEY.getPrefix() + tblName); Hive db = getHive(); List pks = db.getPrimaryKeyList(dbName, tblName); List fks = db.getForeignKeyList(dbName, tblName); List uks = db.getUniqueConstraintList(dbName, tblName); List nns = db.getNotNullConstraintList(dbName, tblName); - if ((pks != null && !pks.isEmpty()) || (fks != null && !fks.isEmpty()) || (uks != null && !uks.isEmpty()) + if ((pks != null && !pks.isEmpty()) || (uks != null && !uks.isEmpty()) || (nns != null && !nns.isEmpty())) { try (JsonWriter jsonWriter = - new JsonWriter(constraintsFile.getFileSystem(conf), constraintsFile)) { - ConstraintsSerializer serializer = new ConstraintsSerializer(pks, fks, uks, nns, conf); + new JsonWriter(commonConstraintsFile.getFileSystem(conf), commonConstraintsFile)) { + ConstraintsSerializer serializer = new ConstraintsSerializer(pks, null, uks, nns, conf); + serializer.writeTo(jsonWriter, null); + } + } + if (fks != null && !fks.isEmpty()) { + try (JsonWriter jsonWriter = + new JsonWriter(fkConstraintsFile.getFileSystem(conf), fkConstraintsFile)) { + ConstraintsSerializer serializer = new ConstraintsSerializer(null, fks, null, null, conf); serializer.writeTo(jsonWriter, null); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java index 12d4c0d..d1ffe12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java @@ -25,7 +25,9 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask.ConstraintFileType; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; @@ -36,15 +38,20 @@ Licensed to the Apache Software Foundation (ASF) under one private int currentConstraintIndex; private FileSystem fs; private Path path; + private ConstraintFileType mode = ConstraintFileType.COMMON; public ConstraintEventsIterator(String dumpDirectory, HiveConf hiveConf) throws IOException { path = new Path(dumpDirectory); fs = path.getFileSystem(hiveConf); } - private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir) { + private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir, String prefix) { try { - return fs.listStatus(new Path(dbDir, ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME)); + return fs.listStatus(new Path(dbDir, ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME), new PathFilter() { + public boolean accept(Path p) { + return p.getName().startsWith(prefix); + } + }); } catch (FileNotFoundException e) { return new FileStatus[]{}; } catch (IOException e) { @@ -52,8 +59,7 @@ public ConstraintEventsIterator(String dumpDirectory, HiveConf hiveConf) throws } } - @Override - public boolean hasNext() { + boolean hasNext(ConstraintFileType type) { if (dbDirs == null) { try { dbDirs = fs.listStatus(path, EximUtil.getDirectoryFilter(fs)); @@ -63,7 +69,7 @@ public boolean hasNext() { currentDbIndex = 0; if (dbDirs.length != 0) { currentConstraintIndex = 0; - constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath()); + constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath(), type.getPrefix()); } } if ((currentDbIndex < dbDirs.length) && (currentConstraintIndex < constraintFiles.length)) { @@ -73,7 +79,7 @@ public boolean hasNext() { currentDbIndex ++; if (currentDbIndex < dbDirs.length) { currentConstraintIndex = 0; - constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[currentDbIndex].getPath()); + constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[currentDbIndex].getPath(), type.getPrefix()); } else { constraintFiles = null; } @@ -82,6 +88,22 @@ public boolean hasNext() { } @Override + public boolean hasNext() { + if (mode == ConstraintFileType.COMMON) { + if (hasNext(ConstraintFileType.COMMON)) { + return true; + } else { + // Switch to iterate foreign keys + mode = ConstraintFileType.FOREIGNKEY; + currentDbIndex = 0; + currentConstraintIndex = 0; + dbDirs = null; + } + } + return hasNext(ConstraintFileType.FOREIGNKEY); + } + + @Override public FSConstraintEvent next() { int thisIndex = currentConstraintIndex; currentConstraintIndex++; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java index 200ac0a..2ddbda8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java @@ -73,41 +73,49 @@ public TaskTracker tasks() throws IOException, SemanticException { String nnsString = json.getString("nns"); List> tasks = new ArrayList>(); - AddPrimaryKeyHandler pkHandler = new AddPrimaryKeyHandler(); - DumpMetaData pkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_PRIMARYKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, - context.hiveConf); - pkDumpMetaData.setPayload(pksString); - tasks.addAll(pkHandler.handle( - new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, pkDumpMetaData, context.hiveConf, - context.hiveDb, null, LOG))); + if (pksString != null && !pksString.isEmpty()) { + AddPrimaryKeyHandler pkHandler = new AddPrimaryKeyHandler(); + DumpMetaData pkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_PRIMARYKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + pkDumpMetaData.setPayload(pksString); + tasks.addAll(pkHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, pkDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + } - AddUniqueConstraintHandler ukHandler = new AddUniqueConstraintHandler(); - DumpMetaData ukDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_UNIQUECONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, - context.hiveConf); - ukDumpMetaData.setPayload(uksString); - tasks.addAll(ukHandler.handle( - new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, ukDumpMetaData, context.hiveConf, - context.hiveDb, null, LOG))); + if (uksString != null && !uksString.isEmpty()) { + AddUniqueConstraintHandler ukHandler = new AddUniqueConstraintHandler(); + DumpMetaData ukDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_UNIQUECONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + ukDumpMetaData.setPayload(uksString); + tasks.addAll(ukHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, ukDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + } - AddNotNullConstraintHandler nnHandler = new AddNotNullConstraintHandler(); - DumpMetaData nnDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_NOTNULLCONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, - context.hiveConf); - nnDumpMetaData.setPayload(nnsString); - tasks.addAll(nnHandler.handle( - new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, nnDumpMetaData, context.hiveConf, - context.hiveDb, null, LOG))); + if (nnsString != null && !nnsString.isEmpty()) { + AddNotNullConstraintHandler nnHandler = new AddNotNullConstraintHandler(); + DumpMetaData nnDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_NOTNULLCONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + nnDumpMetaData.setPayload(nnsString); + tasks.addAll(nnHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, nnDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + } - AddForeignKeyHandler fkHandler = new AddForeignKeyHandler(); - DumpMetaData fkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_FOREIGNKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, - context.hiveConf); - fkDumpMetaData.setPayload(fksString); - tasks.addAll(fkHandler.handle( - new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, fkDumpMetaData, context.hiveConf, - context.hiveDb, null, LOG))); + if (fksString != null && !fksString.isEmpty()) { + AddForeignKeyHandler fkHandler = new AddForeignKeyHandler(); + DumpMetaData fkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_FOREIGNKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + fkDumpMetaData.setPayload(fksString); + tasks.addAll(fkHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, fkDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + } tasks.forEach(tracker::addTask); return tracker;