diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 5a187f4e65..6d1be3174f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -2138,7 +2138,13 @@ public void testStatus() throws IOException { dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, "ALTER TABLE " + dbName + ".ptned2 DROP PARTITION (b=11)"); - assertTrue(finalTblReplDumpId.compareTo(lastTblReplDumpId) > 0); + /* + Comparisons using Strings for event Ids is wrong. This should be numbers since lexical string comparison + and numeric comparision differ. This requires a broader change where we return the dump Id as long and not string + fixing this here for now as it was observed in one of the builds where "1001".compareTo("998") results + in failure of the assertion below. + */ + assertTrue(new Long(Long.parseLong(finalTblReplDumpId)).compareTo(Long.parseLong(lastTblReplDumpId)) > 0); // TODO : currently not testing the following scenarios: // a) Multi-db wh-level REPL LOAD - need to add that diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 41e834d752..abd916965b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -58,21 +58,21 @@ Licensed to the Apache Software Foundation (ASF) under one private static WarehouseInstance primary, replica; - @BeforeClass - public static void classLevelSetup() throws Exception { - Configuration conf = new Configuration(); - conf.set("dfs.client.use.datanode.hostname", "true"); - MiniDFSCluster miniDFSCluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - primary = new WarehouseInstance(LOG, miniDFSCluster); - replica = new WarehouseInstance(LOG, miniDFSCluster); - } + @BeforeClass + public static void classLevelSetup() throws Exception { + Configuration conf = new Configuration(); + conf.set("dfs.client.use.datanode.hostname", "true"); + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + primary = new WarehouseInstance(LOG, miniDFSCluster); + replica = new WarehouseInstance(LOG, miniDFSCluster); + } - @AfterClass - public static void classLevelTearDown() throws IOException { - primary.close(); - replica.close(); - } + @AfterClass + public static void classLevelTearDown() throws IOException { + primary.close(); + replica.close(); + } private String primaryDbName, replicatedDbName; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index a35f7b20b4..6cbe8cb98b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -94,10 +94,9 @@ private void initialize(String cmRoot) throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); hiveConf.setVar(HiveConf.ConfVars.REPLCMDIR, cmRoot); hiveConf.setVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR, functionsRoot); - String schemaName = "APP" + uniqueIdentifier; - System.setProperty("datanucleus.mapping.Schema", schemaName); + System.setProperty("datanucleus.mapping.Schema", "APP"); hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, - "jdbc:derby:memory:${test.tmp.dir}/" + schemaName + ";create=true"); + "jdbc:derby:memory:${test.tmp.dir}/APP;create=true"); int metaStorePort = MetaStoreUtils.startMetaStore(hiveConf); hiveConf.setVar(HiveConf.ConfVars.REPLDIR, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 285f6248f6..8e7704d1b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -159,7 +159,7 @@ protected int execute(DriverContext driverContext) { if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){ LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath); - if (!doCopy(toPath, dstFs, oneSrc.getPath(), actualSrcFs)) { + if (!doCopy(toPath, dstFs, oneSrc.getPath(), actualSrcFs, conf)) { console.printError("Failed to copy: '" + oneSrc.getPath().toString() + "to: '" + toPath.toString() + "'"); return 1; @@ -186,7 +186,8 @@ protected int execute(DriverContext driverContext) { } } - private boolean doCopy(Path dst, FileSystem dstFs, Path src, FileSystem srcFs) throws IOException { + public static boolean doCopy(Path dst, FileSystem dstFs, Path src, FileSystem srcFs, + HiveConf conf) throws IOException { if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) || isLocalFile(src) || isLocalFile(dst)){ // regular copy in test env, or when source or destination is a local file @@ -200,7 +201,7 @@ private boolean doCopy(Path dst, FileSystem dstFs, Path src, FileSystem srcFs) t } } - private boolean isLocalFile(Path p) { + private static boolean isLocalFile(Path p) { String scheme = p.toUri().getScheme(); boolean isLocalFile = scheme.equalsIgnoreCase("file"); LOG.debug("{} was a local file? {}, had scheme {}",p.toUri(), isLocalFile, scheme); @@ -275,23 +276,4 @@ public String getName() { } return copyTask; } - - public static Task getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) { - Task copyTask = null; - LOG.debug("ReplCopyTask:getDumpCopyTask: "+srcPath + "=>" + dstPath); - if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ - ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); - LOG.debug("ReplCopyTask:\trcwork"); - if (replicationSpec.isLazy()){ - LOG.debug("ReplCopyTask:\tlazy"); - rcwork.setListFilesOnOutputBehaviour(true); - } - copyTask = TaskFactory.get(rcwork, conf); - } else { - LOG.debug("ReplCopyTask:\tcwork"); - copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf); - } - return copyTask; - } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 1bff176166..22094c0563 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -140,7 +140,7 @@ private EximUtil() { * Initialize the URI where the exported data collection is * to created for export, or is present for import */ - static URI getValidatedURI(HiveConf conf, String dcPath) throws SemanticException { + public static URI getValidatedURI(HiveConf conf, String dcPath) throws SemanticException { try { boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); URI uri = new Path(dcPath).toUri(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index e101d72d4c..fdf6c3c8d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -1,49 +1,28 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package org.apache.hadoop.hive.ql.parse; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.util.HashSet; -import java.util.List; - import org.antlr.runtime.tree.Tree; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.ReplCopyTask; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.metadata.Table; -import org.slf4j.Logger; +import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; /** * ExportSemanticAnalyzer. @@ -51,9 +30,7 @@ */ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { - private ReplicationSpec replicationSpec; - - public ExportSemanticAnalyzer(QueryState queryState) throws SemanticException { + ExportSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); } @@ -62,16 +39,13 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { Tree tableTree = ast.getChild(0); Tree toTree = ast.getChild(1); + ReplicationSpec replicationSpec; if (ast.getChildCount() > 2) { replicationSpec = new ReplicationSpec((ASTNode) ast.getChild(2)); } else { replicationSpec = new ReplicationSpec(); } - // initialize export path - String tmpPath = stripQuotes(toTree.getText()); - URI toURI = EximUtil.getValidatedURI(conf, tmpPath); - // initialize source table/partition TableSpec ts; @@ -93,126 +67,13 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } } + // initialize export path + String tmpPath = stripQuotes(toTree.getText()); // All parsing is done, we're now good to start the export process. - prepareExport(ast, toURI, ts, replicationSpec, db, conf, ctx, rootTasks, inputs, outputs, LOG); - - } - - // FIXME : Move to EximUtil - it's okay for this to stay here for a little while more till we finalize the statics - public static void prepareExport( - ASTNode ast, URI toURI, TableSpec ts, - ReplicationSpec replicationSpec, Hive db, HiveConf conf, - Context ctx, List> rootTasks, HashSet inputs, HashSet outputs, - Logger LOG) throws SemanticException { - - if (ts != null) { - try { - EximUtil.validateTable(ts.tableHandle); - if (replicationSpec.isInReplicationScope() - && ts.tableHandle.isTemporary()){ - // No replication for temporary tables either - ts = null; - } else if (ts.tableHandle.isView()) { - replicationSpec.setIsMetadataOnly(true); - } - - } catch (SemanticException e) { - // table was a non-native table or an offline table. - // ignore for replication, error if not. - if (replicationSpec.isInReplicationScope()){ - ts = null; // null out ts so we can't use it. - } else { - throw e; - } - } - } - - try { - - FileSystem fs = FileSystem.get(toURI, conf); - Path toPath = new Path(toURI.getScheme(), toURI.getAuthority(), toURI.getPath()); - try { - FileStatus tgt = fs.getFileStatus(toPath); - // target exists - if (!tgt.isDir()) { - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast, - "Target is not a directory : " + toURI)); - } else { - FileStatus[] files = fs.listStatus(toPath, FileUtils.HIDDEN_FILES_PATH_FILTER); - if (files != null && files.length != 0) { - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast, - "Target is not an empty directory : " + toURI)); - } - } - } catch (FileNotFoundException e) { - } - } catch (IOException e) { - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast), e); - } - - PartitionIterable partitions = null; - try { - replicationSpec.setCurrentReplicationState(String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId())); - if ( (ts != null) && (ts.tableHandle.isPartitioned())){ - if (ts.specType == TableSpec.SpecType.TABLE_ONLY){ - // TABLE-ONLY, fetch partitions if regular export, don't if metadata-only - if (replicationSpec.isMetadataOnly()){ - partitions = null; - } else { - partitions = new PartitionIterable(db,ts.tableHandle,null,conf.getIntVar( - HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); - } - } else { - // PARTITIONS specified - partitions inside tableSpec - partitions = new PartitionIterable(ts.partitions); - } - } else { - // Either tableHandle isn't partitioned => null, or repl-export after ts becomes null => null. - // or this is a noop-replication export, so we can skip looking at ptns. - partitions = null; - } - - Path path = new Path(ctx.getLocalTmpPath(), EximUtil.METADATA_NAME); - EximUtil.createExportDump( - FileSystem.getLocal(conf), - path, - (ts != null ? ts.tableHandle : null), - partitions, - replicationSpec); - - Task rTask = ReplCopyTask.getDumpCopyTask(replicationSpec, path, new Path(toURI), conf); - - rootTasks.add(rTask); - LOG.debug("_metadata file written into " + path.toString() - + " and then copied to " + toURI.toString()); - } catch (Exception e) { - throw new SemanticException( - ErrorMsg.IO_ERROR - .getMsg("Exception while writing out the local file"), e); - } - - if (!(replicationSpec.isMetadataOnly() || (ts == null))) { - Path parentPath = new Path(toURI); - if (ts.tableHandle.isPartitioned()) { - for (Partition partition : partitions) { - Path fromPath = partition.getDataLocation(); - Path toPartPath = new Path(parentPath, partition.getName()); - Task rTask = - ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toPartPath, conf); - rootTasks.add(rTask); - inputs.add(new ReadEntity(partition)); - } - } else { - Path fromPath = ts.tableHandle.getDataLocation(); - Path toDataPath = new Path(parentPath, EximUtil.DATA_PATH_NAME); - Task rTask = - ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf); - rootTasks.add(rTask); - inputs.add(new ReadEntity(ts.tableHandle)); - } - outputs.add(toWriteEntity(parentPath, conf)); - } + TableExport.Paths exportPaths = new TableExport.Paths(ast, tmpPath, conf); + TableExport.AuthEntities authEntities = + new TableExport(exportPaths, ts, replicationSpec, db, conf, LOG).run(); + inputs.addAll(authEntities.inputs); + outputs.addAll(authEntities.outputs); } - - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 961561db1a..f51f152441 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -1,19 +1,19 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package org.apache.hadoop.hive.ql.parse; @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; +import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory; @@ -77,7 +78,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.function.Consumer; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; @@ -192,12 +192,13 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { for (String dbName : matchesDb(dbNameOrPattern)) { REPL_STATE_LOG.info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP", dbName); LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); + Path dbRoot = dumpDbMetadata(dbName, dumpRoot); dumpFunctionMetadata(dbName, dumpRoot); for (String tblName : matchesTbl(dbName, tblNameOrPattern)) { - LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName - + " to db root " + dbRoot.toUri()); - dumpTbl(ast, dbName, tblName, dbRoot); + LOG.debug( + "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); + dumpTable(ast, dbName, tblName, dbRoot); } REPL_STATE_LOG.info("Repl Dump: Completed analyzing Repl Dump for DB: {} and created {} COPY tasks to dump " + "metadata and data", @@ -347,6 +348,7 @@ private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticExcepti FileSystem fs = dbRoot.getFileSystem(conf); Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); HiveWrapper.Tuple database = new HiveWrapper(db, dbName).database(); + inputs.add(new ReadEntity(database.object)); EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec); REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata"); } catch (Exception e) { @@ -359,7 +361,6 @@ private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticExcepti private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticException { Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME); try { - // TODO : This should ideally return the Function Objects and not Strings(function names) that should be done by the caller, Look at this separately. List functionNames = db.getFunctions(dbName, "*"); for (String functionName : functionNames) { HiveWrapper.Tuple tuple; @@ -393,35 +394,22 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticE } } - /** - * - * @param ast - * @param dbName - * @param tblName - * @param dbRoot - * @return tbl dumped path - * @throws SemanticException - */ - private Path dumpTbl(ASTNode ast, String dbName, String tblName, Path dbRoot) throws SemanticException { - Path tableRoot = new Path(dbRoot, tblName); + private void dumpTable(ASTNode ast, String dbName, String tblName, Path dbRoot) + throws SemanticException { try { - URI toURI = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString()); TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null); - - ExportSemanticAnalyzer.prepareExport(ast, toURI, ts, getNewReplicationSpec(), db, conf, ctx, - rootTasks, inputs, outputs, LOG); + TableExport.Paths exportPaths = new TableExport.Paths(ast, dbRoot, tblName, conf); + new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).run(); REPL_STATE_LOG.info("Repl Dump: Analyzed dump for table/view: {}.{} and created copy tasks to dump metadata " + - "and data to path {}", dbName, tblName, toURI.toString()); + "and data to path {}", dbName, tblName, exportPaths.exportRootDir.toString()); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. // Just log a debug message and skip it. LOG.debug(te.getMessage()); - return null; } catch (HiveException e) { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e); } - return tableRoot; } // REPL LOAD diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java index 1dcaec2701..27a6ea6c0e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java @@ -32,7 +32,7 @@ public class HiveWrapper { private final Hive db; private final String dbName; - private final BootStrapReplicationSpecFunction functionForSpec; + private final Tuple.Function functionForSpec; public HiveWrapper(Hive db, String dbName) { this.dbName = dbName; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java new file mode 100644 index 0000000000..144d667f9b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -0,0 +1,271 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.PartitionIterable; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.slf4j.Logger; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity; + +public class TableExport { + private TableSpec tableSpec; + private final ReplicationSpec replicationSpec; + private final Hive db; + private final HiveConf conf; + private final Logger logger; + private final Paths paths; + private final AuthEntities authEntities = new AuthEntities(); + + public TableExport(Paths paths, TableSpec tableSpec, + ReplicationSpec replicationSpec, Hive db, HiveConf conf, Logger logger) + throws SemanticException { + this.tableSpec = (tableSpec != null + && tableSpec.tableHandle.isTemporary() + && !replicationSpec.isInReplicationScope()) + ? null + : tableSpec; + this.replicationSpec = replicationSpec; + this.db = db; + this.conf = conf; + this.logger = logger; + this.paths = paths; + } + + public AuthEntities run() throws SemanticException { + if (tableSpec == null) { + writeMetaData(null); + } else if (shouldExport()) { + if (tableSpec.tableHandle.isView()) { + replicationSpec.setIsMetadataOnly(true); + } + PartitionIterable withPartitions = partitions(); + writeMetaData(withPartitions); + if (!replicationSpec.isMetadataOnly()) { + writeData(withPartitions); + } + } + return authEntities; + } + + private PartitionIterable partitions() throws SemanticException { + try { + long currentEventId = db.getMSC().getCurrentNotificationEventId().getEventId(); + replicationSpec.setCurrentReplicationState(String.valueOf(currentEventId)); + if (tableSpec.tableHandle.isPartitioned()) { + if (tableSpec.specType == TableSpec.SpecType.TABLE_ONLY) { + // TABLE-ONLY, fetch partitions if regular export, don't if metadata-only + if (replicationSpec.isMetadataOnly()) { + return null; + } else { + return new PartitionIterable(db, tableSpec.tableHandle, null, conf.getIntVar( + HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); + } + } else { + // PARTITIONS specified - partitions inside tableSpec + return new PartitionIterable(tableSpec.partitions); + } + } else { + // Either tableHandle isn't partitioned => null, or repl-export after ts becomes null => null. + // or this is a noop-replication export, so we can skip looking at ptns. + return null; + } + } catch (Exception e) { + throw new SemanticException("Error when identifying partitions", e); + } + } + + private void writeMetaData(PartitionIterable partitions) throws SemanticException { + try { + EximUtil.createExportDump( + paths.exportFileSystem, + paths.metaDataExportFile(), + tableSpec == null ? null : tableSpec.tableHandle, + partitions, + replicationSpec); + logger.debug("_metadata file written into " + paths.metaDataExportFile().toString()); + } catch (Exception e) { + // the path used above should not be used on a second try as each dump request is written to a unique location. + // however if we want to keep the dump location clean we might want to delete the paths + throw new SemanticException( + ErrorMsg.IO_ERROR.getMsg("Exception while writing out the local file"), e); + } + } + + private void writeData(PartitionIterable partitions) throws SemanticException { + try { + if (tableSpec.tableHandle.isPartitioned()) { + if (partitions == null) { + throw new IllegalStateException( + "partitions cannot be null for partitionTable :" + tableSpec.tableName); + } + for (Partition partition : partitions) { + Path fromPath = partition.getDataLocation(); + // this the data copy + Path rootDataDumpDir = paths.partitionExportDir(partition.getName()); + new FileOperations(fromPath, rootDataDumpDir, conf).export(replicationSpec); + authEntities.inputs.add(new ReadEntity(partition)); + } + } else { + Path fromPath = tableSpec.tableHandle.getDataLocation(); + //this is the data copy + new FileOperations(fromPath, paths.dataExportDir(), conf).export(replicationSpec); + authEntities.inputs.add(new ReadEntity(tableSpec.tableHandle)); + } + authEntities.outputs.add(toWriteEntity(paths.exportRootDir, conf)); + } catch (Exception e) { + throw new SemanticException(e); + } + } + + private boolean shouldExport() throws SemanticException { + if (replicationSpec.isInReplicationScope()) { + return !(tableSpec.tableHandle.isTemporary() || tableSpec.tableHandle.isNonNative()); + } else if (tableSpec.tableHandle.isNonNative()) { + throw new SemanticException(ErrorMsg.EXIM_FOR_NON_NATIVE.getMsg()); + } + return true; + } + + /** + * this class is responsible for giving various paths to be used during export along with root export + * directory creation. + */ + public static class Paths { + private final ASTNode ast; + private final HiveConf conf; + public final Path exportRootDir; + private final FileSystem exportFileSystem; + + public Paths(ASTNode ast, Path dbRoot, String tblName, HiveConf conf) throws SemanticException { + this.ast = ast; + this.conf = conf; + Path tableRoot = new Path(dbRoot, tblName); + URI exportRootDir = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString()); + validateTargetDir(exportRootDir); + this.exportRootDir = new Path(exportRootDir); + try { + this.exportFileSystem = this.exportRootDir.getFileSystem(conf); + if (!exportFileSystem.exists(this.exportRootDir)) { + exportFileSystem.mkdirs(this.exportRootDir); + } + } catch (IOException e) { + throw new SemanticException(e); + } + } + + public Paths(ASTNode ast, String path, HiveConf conf) throws SemanticException { + this.ast = ast; + this.conf = conf; + this.exportRootDir = new Path(EximUtil.getValidatedURI(conf, path)); + try { + this.exportFileSystem = exportRootDir.getFileSystem(conf); + if (!exportFileSystem.exists(this.exportRootDir)) { + exportFileSystem.mkdirs(this.exportRootDir); + } + } catch (IOException e) { + throw new SemanticException(e); + } + } + + private Path partitionExportDir(String partitionName) throws SemanticException { + return exportDir(new Path(exportRootDir, partitionName)); + } + + private Path exportDir(Path exportDir) throws SemanticException { + try { + if (!exportFileSystem.exists(exportDir)) { + exportFileSystem.mkdirs(exportDir); + } + return exportDir; + } catch (IOException e) { + throw new SemanticException( + "error while creating directory for partition at " + exportDir, e); + } + } + + private Path metaDataExportFile() { + return new Path(exportRootDir, EximUtil.METADATA_NAME); + } + + /** + * This is currently referring to the export path for the data within a non partitioned table. + * Partition's data export directory is created within the export semantics of partition. + */ + private Path dataExportDir() throws SemanticException { + return exportDir(new Path(exportRootDir, EximUtil.DATA_PATH_NAME)); + } + + /** + * this level of validation might not be required as the root directory in which we dump will + * be different for each run hence possibility of it having data is not there. + */ + private void validateTargetDir(URI rootDirExportFile) throws SemanticException { + try { + FileSystem fs = FileSystem.get(rootDirExportFile, conf); + Path toPath = new Path(rootDirExportFile.getScheme(), rootDirExportFile.getAuthority(), + rootDirExportFile.getPath()); + try { + FileStatus tgt = fs.getFileStatus(toPath); + // target exists + if (!tgt.isDirectory()) { + throw new SemanticException(ErrorMsg.INVALID_PATH + .getMsg(ast, "Target is not a directory : " + rootDirExportFile)); + } else { + FileStatus[] files = fs.listStatus(toPath, FileUtils.HIDDEN_FILES_PATH_FILTER); + if (files != null && files.length != 0) { + throw new SemanticException( + ErrorMsg.INVALID_PATH + .getMsg(ast, "Target is not an empty directory : " + rootDirExportFile) + ); + } + } + } catch (FileNotFoundException ignored) { + } + } catch (IOException e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast), e); + } + } + } + + public static class AuthEntities { + public final Set inputs = new HashSet<>(); + public final Set outputs = new HashSet<>(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 52d136fde0..72368af83b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -91,7 +91,8 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) { for (String file : files) { - fileListWriter.write(file + "\n"); + fileListWriter.write(file); + fileListWriter.newLine(); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java new file mode 100644 index 0000000000..bf4fcf8910 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -0,0 +1,115 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.io; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +public class FileOperations { + private static Logger logger = LoggerFactory.getLogger(FileOperations.class); + private final Path dataFileListPath; + private final Path exportRootDataDir; + private HiveConf hiveConf; + private final FileSystem dataFileSystem, exportFileSystem; + + public FileOperations(Path dataFileListPath, Path exportRootDataDir, HiveConf hiveConf) + throws IOException { + this.dataFileListPath = dataFileListPath; + this.exportRootDataDir = exportRootDataDir; + this.hiveConf = hiveConf; + dataFileSystem = dataFileListPath.getFileSystem(hiveConf); + exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); + } + + public void export(ReplicationSpec forReplicationSpec) throws IOException, SemanticException { + if (forReplicationSpec.isLazy()) { + exportFilesAsList(); + } else { + copyFiles(); + } + } + + /** + * This writes the actual data in the exportRootDataDir from the source. + */ + private void copyFiles() throws IOException { + RemoteIterator itr = dataFileSystem.listFiles(dataFileListPath, true); + while (itr.hasNext()) { + LocatedFileStatus fileStatus = itr.next(); + if (shouldExport(fileStatus)) { + ReplCopyTask.doCopy(exportRootDataDir, exportFileSystem, fileStatus.getPath(), dataFileSystem, hiveConf); + } + } + } + + /** + * This needs the root data directory to which the data needs to be exported to. + * The data export here is a list of files either in table/partition that are written to the _files + * in the exportRootDataDir provided. + */ + private void exportFilesAsList() throws SemanticException, IOException { + try (BufferedWriter writer = writer()) { + RemoteIterator itr = dataFileSystem.listFiles(dataFileListPath, true); + while (itr.hasNext()) { + LocatedFileStatus fileStatus = itr.next(); + if (shouldExport(fileStatus)) { + writer.write(encodedUri(fileStatus)); + writer.newLine(); + } + } + } + } + + private boolean shouldExport(LocatedFileStatus fileStatus) { + String name = fileStatus.getPath().getName(); + return !(fileStatus.isDirectory() || name.startsWith("_") || name.startsWith(".")); + } + + private BufferedWriter writer() throws IOException { + Path exportToFile = new Path(exportRootDataDir, EximUtil.FILES_NAME); + if (exportFileSystem.exists(exportToFile)) { + throw new IllegalArgumentException( + exportToFile.toString() + " already exists and cant export data from path(dir) " + + dataFileListPath); + } + logger.debug("exporting data files in dir : " + dataFileListPath + " to " + exportToFile); + return new BufferedWriter( + new OutputStreamWriter(exportFileSystem.create(exportToFile)) + ); + } + + private String encodedUri(LocatedFileStatus fileStatus) throws IOException { + Path currentDataFilePath = fileStatus.getPath(); + String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem); + return ReplChangeManager.encodeFileUri(currentDataFilePath.toUri().toString(), checkSum); + } +} diff --git a/ql/src/test/results/clientnegative/authorization_uri_export.q.out b/ql/src/test/results/clientnegative/authorization_uri_export.q.out index f6ed94821f..19c8115939 100644 --- a/ql/src/test/results/clientnegative/authorization_uri_export.q.out +++ b/ql/src/test/results/clientnegative/authorization_uri_export.q.out @@ -9,3 +9,4 @@ POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@export_auth_uri #### A masked pattern was here #### +FAILED: SemanticException [Error 10320]: Error while performing IO operation Exception while writing out the local file