diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 3420efd..c486180 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -172,7 +172,7 @@ public static boolean prepareImport( String parsedLocation, String parsedTableName, String parsedDbName, LinkedHashMap parsedPartSpec, String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x - ) throws IOException, MetaException, HiveException, URISyntaxException { + ) throws IOException, MetaException, HiveException { // initialize load path URI fromURI = EximUtil.getValidatedURI(x.getConf(), stripQuotes(fromLocn)); @@ -491,7 +491,7 @@ private static String partSpecToString(Map partSpec) { } private static void checkTable(Table table, CreateTableDesc tableDesc, ReplicationSpec replicationSpec, HiveConf conf) - throws SemanticException, URISyntaxException { + throws SemanticException { // This method gets called only in the scope that a destination table already exists, so // we're validating if the table is an appropriate destination to import into @@ -717,7 +717,7 @@ private static void createRegularImportTasks( boolean isPartSpecSet, ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh, EximUtil.SemanticAnalyzerWrapperContext x) - throws HiveException, URISyntaxException, IOException, MetaException { + throws HiveException, IOException, MetaException { if (table != null){ if (table.isPartitioned()) { @@ -792,7 +792,7 @@ private static void createReplImportTasks( boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnCreateDb, Table table, URI fromURI, FileSystem fs, Warehouse wh, EximUtil.SemanticAnalyzerWrapperContext x) - throws HiveException, URISyntaxException, IOException, MetaException { + throws HiveException, IOException, MetaException { Task dr = null; WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 938355e..cf4b779 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -22,15 +22,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; -import org.apache.hadoop.hive.metastore.messaging.InsertMessage; -import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; @@ -43,6 +36,7 @@ import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.io.IOUtils; +import org.apache.thrift.TException; import java.io.DataOutputStream; import java.io.FileNotFoundException; @@ -142,7 +136,6 @@ private void initReplDump(ASTNode ast) { // REPL DUMP private void analyzeReplDump(ASTNode ast) throws SemanticException { - // FIXME: support non-bootstrap: use eventFrom/eventTo/batchSize LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern) + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to " + String.valueOf(eventTo) + " batchsize " + String.valueOf(batchSize)); @@ -158,13 +151,12 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { dumpTbl(ast, dbName, tblName, dbRoot); } } - String currentReplId = - String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId()); + String currentReplId; + currentReplId = String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId()); prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId), "dump_dir,last_repl_id#string,string"); - } catch (Exception e) { - // TODO : simple wrap & rethrow for now, clean up with error codes - throw new SemanticException(e); + } catch (HiveException | TException e) { + throw new SemanticException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); } } @@ -176,9 +168,6 @@ String getNextDumpDir() { // repl dump will clash with prior dumps, and thus have to clean up properly. } else { return String.valueOf(System.currentTimeMillis()); - // TODO: time good enough for now - we'll likely improve this. - // We may also work in something the equivalent of pid, thrid and move to nanos to ensure - // uniqueness. } } @@ -192,13 +181,16 @@ String getNextDumpDir() { private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticException { Path dbRoot = new Path(dumpRoot, dbName); try { - // TODO : instantiating FS objects are generally costly. Refactor FileSystem fs = dbRoot.getFileSystem(conf); Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); - Database dbObj = db.getDatabase(dbName); + Database dbObj; + try { + dbObj = db.getDatabase(dbName); + } catch (HiveException e) { + throw new SemanticException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); + } EximUtil.createDbExportDump(fs, dumpPath, dbObj, getNewReplicationSpec()); - } catch (Exception e) { - // TODO : simple wrap & rethrow for now, clean up with error codes + } catch (IOException e) { throw new SemanticException(e); } return dbRoot; @@ -213,7 +205,8 @@ private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticExcepti * @return tbl dumped path * @throws SemanticException */ - private Path dumpTbl(ASTNode ast, String dbName, String tblName, Path dbRoot) throws SemanticException { + private Path dumpTbl(ASTNode ast, String dbName, String tblName, Path dbRoot) + throws SemanticException { Path tableRoot = new Path(dbRoot, tblName); try { URI toURI = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString()); @@ -221,8 +214,7 @@ private Path dumpTbl(ASTNode ast, String dbName, String tblName, Path dbRoot) th ExportSemanticAnalyzer.prepareExport(ast, toURI, ts, getNewReplicationSpec(), db, conf, ctx, rootTasks, inputs, outputs, LOG); } catch (HiveException e) { - // TODO : simple wrap & rethrow for now, clean up with error codes - throw new SemanticException(e); + throw new SemanticException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); } return tableRoot; } @@ -255,12 +247,10 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { LOG.debug("ReplSemanticAnalyzer.analyzeReplLoad: " + String.valueOf(dbNameOrPattern) + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(path)); - // for analyze repl load, we walk through the dir structure available in the path, + // For analyze repl load, we walk through the dir structure available in the path, // looking at each db, and then each table, and then setting up the appropriate // import job in its place. - // FIXME : handle non-bootstrap cases. - // We look at the path, and go through each subdir. // Each subdir corresponds to a database. // For each subdir, there is a _metadata file which allows us to re-impress the db object @@ -268,15 +258,12 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // that we had an IMPORT on each of them, into this db. try { - Path loadPath = new Path(path); final FileSystem fs = loadPath.getFileSystem(conf); - if (!fs.exists(loadPath)) { // supposed dump path does not exist. throw new FileNotFoundException(loadPath.toUri().toString()); } - // Now, the dumped path can be one of two things: // a) It can be a db dump, in which case we expect a set of dirs, each with a // db name, and with a _metadata file in each, and table dirs inside that. @@ -292,18 +279,14 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null); return; } - FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath); if (srcs == null || (srcs.length == 0)) { throw new FileNotFoundException(loadPath.toUri().toString()); } - FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs)); - if ((dirsInLoadPath == null) || (dirsInLoadPath.length == 0)) { throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString()); } - if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) { LOG.debug("Found multiple dirs when we expected 1:"); for (FileStatus d : dirsInLoadPath) { @@ -314,16 +297,12 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { + loadPath.toUri().toString() + " does not correspond to REPL LOAD expecting to load to a singular destination point."); } - for (FileStatus dir : dirsInLoadPath) { analyzeDatabaseLoad(dbNameOrPattern, fs, dir); } - - } catch (Exception e) { - // TODO : simple wrap & rethrow for now, clean up with error codes + } catch (IOException e) { throw new SemanticException(e); } - } private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) @@ -332,51 +311,40 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) // Path being passed to us is a db dump location. We go ahead and load as needed. // dbName might be null or empty, in which case we keep the original db name for the new // database creation - // Two steps here - first, we read the _metadata file here, and create a CreateDatabaseDesc // associated with that // Then, we iterate over all subdirs, and create table imports for each. - EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData(); try { rv = EximUtil.readMetaData(fs, new Path(dir.getPath(), EximUtil.METADATA_NAME)); } catch (IOException e) { throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); } - Database dbObj = rv.getDatabase(); - if (dbObj == null) { throw new IllegalArgumentException( "_metadata file read did not contain a db object - invalid dump."); } - if ((dbName == null) || (dbName.isEmpty())) { // We use dbName specified as long as it is not null/empty. If so, then we use the original - // name - // recorded in the thrift object. + // name recorded in the thrift object. dbName = dbObj.getName(); } - CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); createDbDesc.setName(dbName); createDbDesc.setComment(dbObj.getDescription()); createDbDesc.setDatabaseProperties(dbObj.getParameters()); // note that we do not set location - for repl load, we want that auto-created. - createDbDesc.setIfNotExists(false); - // If it exists, we want this to be an error condition. Repl Load is not intended to replace a - // db. - // TODO: we might revisit this in create-drop-recreate cases, needs some thinking on. - Task createDbTask = TaskFactory.get(new DDLWork(inputs, outputs, createDbDesc), conf); + // If it exists, we want this to be an error condition. Repl Load is not intended to replace a db + Task createDbTask = + TaskFactory.get(new DDLWork(inputs, outputs, createDbDesc), conf); rootTasks.add(createDbTask); - FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs)); - for (FileStatus tableDir : dirsInDbPath) { analyzeTableLoad(dbName, null, tableDir.getPath().toUri().toString(), createDbTask); } - } catch (Exception e) { + } catch (IOException e) { throw new SemanticException(e); } } @@ -389,41 +357,41 @@ private void analyzeTableLoad(String dbName, String tblName, String locn, if (dbName == null || dbName.isEmpty()) { throw new SemanticException("Database name cannot be null for a table load"); } + // no location set on repl loads + boolean isLocationSet = false; + // all repl imports are non-external + boolean isExternalSet = false; + // bootstrap loads are not partition level + boolean isPartSpecSet = false; + // repl loads are not partition level + LinkedHashMap parsedPartSpec = null; + // no location for repl imports + String parsedLocation = null; + boolean waitOnCreateDb = false; + List> importTasks = null; + if (precursor == null) { + importTasks = rootTasks; + waitOnCreateDb = false; + } else { + importTasks = new ArrayList>(); + waitOnCreateDb = true; + } + EximUtil.SemanticAnalyzerWrapperContext x = + new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, importTasks, LOG, + ctx); try { - // no location set on repl loads - boolean isLocationSet = false; - // all repl imports are non-external - boolean isExternalSet = false; - // bootstrap loads are not partition level - boolean isPartSpecSet = false; - // repl loads are not partition level - LinkedHashMap parsedPartSpec = null; - // no location for repl imports - String parsedLocation = null; - boolean waitOnCreateDb = false; - List> importTasks = null; - if (precursor == null) { - importTasks = rootTasks; - waitOnCreateDb = false; - } else { - importTasks = new ArrayList>(); - waitOnCreateDb = true; - } - EximUtil.SemanticAnalyzerWrapperContext x = - new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, importTasks, LOG, - ctx); ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet, waitOnCreateDb, parsedLocation, tblName, dbName, parsedPartSpec, locn, x); - - if (precursor != null) { - for (Task t : importTasks) { - precursor.addDependentTask(t); - } - } - - } catch (Exception e) { + } catch (HiveException | MetaException e) { + throw new SemanticException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); + } catch (IOException e) { throw new SemanticException(e); } + if (precursor != null) { + for (Task t : importTasks) { + precursor.addDependentTask(t); + } + } } // REPL STATUS @@ -438,9 +406,7 @@ private void initReplStatus(ASTNode ast) { private void analyzeReplStatus(ASTNode ast) throws SemanticException { LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: " + String.valueOf(dbNameOrPattern) + "." + String.valueOf(tblNameOrPattern)); - String replLastId = null; - try { if (tblNameOrPattern != null) { // Checking for status of table @@ -464,10 +430,8 @@ private void analyzeReplStatus(ASTNode ast) throws SemanticException { } } } catch (HiveException e) { - throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error - // codes + throw new SemanticException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); } - LOG.debug("RSTATUS: writing repl.last.id=" + String.valueOf(replLastId) + " out to " + ctx.getResFile()); prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string"); @@ -478,14 +442,7 @@ private void prepareReturnValues(List values, String schema) throws Sema for (String s : values) { LOG.debug(" > " + s); } - ctx.setResFile(ctx.getLocalTmpPath()); - // FIXME : this should not accessible by the user if we write to it from the frontend. - // Thus, we should Desc/Work this, otherwise there is a security issue here. - // Note: if we don't call ctx.setResFile, we get a NPE from the following code section - // If we do call it, then FetchWork thinks that the "table" here winds up thinking that - // this is a partitioned dir, which does not work. Thus, this does not work. - writeOutput(values); } @@ -503,11 +460,9 @@ private void writeOutput(List values) throws SemanticException { } outStream.write(Utilities.newLineCode); } catch (IOException e) { - throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error - // codes + throw new SemanticException(e); } finally { - IOUtils.closeStream(outStream); // TODO : we have other closes here, and in ReplCopyTask - - // replace with this + IOUtils.closeStream(outStream); } } @@ -518,9 +473,8 @@ private ReplicationSpec getNewReplicationSpec() throws SemanticException { replicationSpec.setCurrentReplicationState(String.valueOf(db.getMSC() .getCurrentNotificationEventId().getEventId())); return replicationSpec; - } catch (Exception e) { - throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error - // codes + } catch (TException e) { + throw new SemanticException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); } }