diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 9b7014b..3ac5ba7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -163,7 +163,13 @@ public void testBasic() throws IOException { advanceDumpDir(); run("REPL DUMP " + dbName); String replDumpLocn = getResult(0,0); - run("REPL LOAD " + dbName + "_dupe FROM '"+replDumpLocn+"'"); + String replDumpId = getResult(0,1,true); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + run("REPL STATUS " + dbName + "_dupe"); + verifyResults(new String[] {replDumpId}); run("SELECT * from " + dbName + "_dupe.unptned"); verifyResults(unptn_data); @@ -230,7 +236,6 @@ public void testIncrementalAdds() throws IOException { run("SELECT a from " + dbName + ".ptned WHERE b=2"); verifyResults(ptn_data_2); - // verified up to here. run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=1)"); run("SELECT a from " + dbName + ".ptned_late WHERE b=1"); @@ -244,19 +249,28 @@ public void testIncrementalAdds() throws IOException { String incrementalDumpLocn = getResult(0,0); String incrementalDumpId = getResult(0,1,true); LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'"); + run("REPL STATUS " + dbName + "_dupe"); +// verifyResults(new String[] {incrementalDumpId}); + // TODO: this will currently not work because we need to add in ALTER_DB support into this + // and queue in a dummy ALTER_DB to update the repl.last.id on the last event of every + // incremental dump. Currently, the dump id fetched will be the last dump id at the time + // the db was created from the bootstrap export dump + run("SELECT * from " + dbName + "_dupe.unptned_empty"); verifyResults(empty); run("SELECT a from " + dbName + ".ptned_empty"); verifyResults(empty); -// this does not work because LOAD DATA LOCAL INPATH into an unptned table seems -// to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after -// fixing that. // run("SELECT * from " + dbName + "_dupe.unptned"); // verifyResults(unptn_data); + // TODO :this does not work because LOAD DATA LOCAL INPATH into an unptned table seems + // to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after + // fixing that. run("SELECT * from " + dbName + "_dupe.unptned_late"); verifyResults(unptn_data); @@ -291,6 +305,16 @@ private String getResult(int rowNum, int colNum, boolean reuse) throws IOExcepti } private void verifyResults(String[] data) throws IOException { + List results = getOutput(); + LOG.info("Expecting {}",data); + LOG.info("Got {}",results); + assertEquals(data.length,results.size()); + for (int i = 0; i < data.length; i++){ + assertEquals(data[i],results.get(i)); + } + } + + private List getOutput() throws IOException { List results = new ArrayList(); try { driver.getResults(results); @@ -298,16 +322,23 @@ private void verifyResults(String[] data) throws IOException { LOG.warn(e.getMessage(),e); throw new RuntimeException(e); } - LOG.info("Expecting {}",data); - LOG.info("Got {}",results); - assertEquals(data.length,results.size()); - for (int i = 0; i < data.length; i++){ - assertEquals(data[i],results.get(i)); + return results; + } + + private void printOutput() throws IOException { + for (String s : getOutput()){ + LOG.info(s); } } private static void run(String cmd) throws RuntimeException { + try { run(cmd,false); // default arg-less run simply runs, and does not care about failure + } catch (AssertionError ae){ + // Hive code has AssertionErrors in some cases - we want to record what happens + LOG.warn("AssertionError:",ae); + throw new RuntimeException(ae); + } } private static boolean run(String cmd, boolean errorOnFail) throws RuntimeException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index ce952c5..5561e06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -102,9 +102,11 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { String parsedDbName = null; LinkedHashMap parsedPartSpec = new LinkedHashMap(); - // waitOnCreateDb determines whether or not non-existence of - // db is an error. For regular imports, it is. - boolean waitOnCreateDb = false; + // waitOnPrecursor determines whether or not non-existence of + // a dependent object is an error. For regular imports, it is. + // for now, the only thing this affects is whether or not the + // db exists. + boolean waitOnPrecursor = false; for (int i = 1; i < ast.getChildCount(); ++i){ ASTNode child = (ASTNode) ast.getChild(i); @@ -133,7 +135,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // parsing statement is now done, on to logic. tableExists = prepareImport( - isLocationSet, isExternalSet, isPartSpecSet, waitOnCreateDb, + isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor, parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(), new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx)); @@ -168,7 +170,7 @@ private void parsePartitionSpec(ASTNode tableNode, LinkedHashMap } public static boolean prepareImport( - boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnCreateDb, + boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnPrecursor, String parsedLocation, String parsedTableName, String parsedDbName, LinkedHashMap parsedPartSpec, String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x @@ -281,7 +283,7 @@ public static boolean prepareImport( } else { createReplImportTasks( tblDesc, partitionDescs, - isPartSpecSet, replicationSpec, waitOnCreateDb, table, + isPartSpecSet, replicationSpec, waitOnPrecursor, table, fromURI, fs, wh, x); } return tableExists; @@ -799,7 +801,7 @@ private static void createRegularImportTasks( private static void createReplImportTasks( CreateTableDesc tblDesc, List partitionDescs, - boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnCreateDb, + boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnPrecursor, Table table, URI fromURI, FileSystem fs, Warehouse wh, EximUtil.SemanticAnalyzerWrapperContext x) throws HiveException, URISyntaxException, IOException, MetaException { @@ -829,12 +831,12 @@ private static void createReplImportTasks( // defaults and do not error out in that case. Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName()); if (parentDb == null){ - if (!waitOnCreateDb){ + if (!waitOnPrecursor){ throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName())); } } if (tblDesc.getLocation() == null) { - if (!waitOnCreateDb){ + if (!waitOnPrecursor){ tblDesc.setLocation(wh.getTablePath(parentDb, tblDesc.getTableName()).toString()); } else { tblDesc.setLocation( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 6fff98d..8725015 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.io.IOUtils; @@ -205,7 +206,6 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } Integer maxRange = Ints.checkedCast(eventTo - eventFrom + 1); - batchSize = 15; if (batchSize == null){ batchSize = maxRange; } else { @@ -478,7 +478,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { // not an event dump, and table name pattern specified, this has to be a tbl-level dump - analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null); + rootTasks.addAll(analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null)); return; } @@ -512,13 +512,45 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } } else { // event dump, each subdir is an individual event dump. + Task evTaskRoot = TaskFactory.get(new DependencyCollectionWork(), conf); + Task taskChainTail = evTaskRoot; + int evstage = 0; for (FileStatus dir : dirsInLoadPath){ + LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); // event loads will behave similar to table loads, with one crucial difference // precursor order is strict, and each event must be processed after the previous one. - LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); - analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), null); - // FIXME: we should have a strict order of execution so that each event's tasks occur linearly + // The way we handle this strict order is as follows: + // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask) + // at the head of our event chain. For each event we process, we tell analyzeTableLoad to + // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks + // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all + // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of + // tasks as follows: + // + // --->ev1.task1-- --->ev2.task1-- + // / \ / \ + // evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail + // \ / + // --->ev1.task3-- + // + List> evTasks = analyzeEventLoad( + dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), taskChainTail); + LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0); + if ((evTasks != null) && (!evTasks.isEmpty())){ + Task barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); + for (Task t : evTasks){ + t.addDependentTask(barrierTask); + LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", + t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); + } + LOG.debug("Updated taskChainTail from {}{} to {}{}", + taskChainTail.getClass(),taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); + taskChainTail = barrierTask; + evstage++; + } } + LOG.debug("added evTaskRoot {}:{}",evTaskRoot.getClass(),evTaskRoot.getId()); + rootTasks.add(evTaskRoot); } } catch (Exception e) { @@ -528,9 +560,12 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } - private void analyzeEventLoad(String dbNameOrPattern, String tblNameOrPattern, - FileSystem fs, FileStatus dir) throws SemanticException { - + private List> analyzeEventLoad( + String dbName, String tblName, String locn, + Task precursor ) throws SemanticException { + // Currently handles only create-tbl & insert-ptn, since only those are dumped + // As we add more event types, this will expand. + return analyzeTableLoad(dbName, tblName, locn, precursor); } private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) @@ -581,14 +616,16 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs)); for (FileStatus tableDir : dirsInDbPath) { - analyzeTableLoad(dbName, null, tableDir.getPath().toUri().toString(), createDbTask); + analyzeTableLoad( + dbName, null, tableDir.getPath().toUri().toString(), createDbTask); } } catch (Exception e) { throw new SemanticException(e); } } - private void analyzeTableLoad(String dbName, String tblName, String locn, + private List> analyzeTableLoad( + String dbName, String tblName, String locn, Task precursor) throws SemanticException { // Path being passed to us is a table dump location. We go ahead and load it in as needed. // If tblName is null, then we default to the table name specified in _metadata, which is good. @@ -607,27 +644,23 @@ private void analyzeTableLoad(String dbName, String tblName, String locn, LinkedHashMap parsedPartSpec = null; // no location for repl imports String parsedLocation = null; - boolean waitOnCreateDb = false; - List> importTasks = null; - if (precursor == null) { - importTasks = rootTasks; - waitOnCreateDb = false; - } else { - importTasks = new ArrayList>(); - waitOnCreateDb = true; - } + List> importTasks = new ArrayList>(); + EximUtil.SemanticAnalyzerWrapperContext x = new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, importTasks, LOG, ctx); ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet, - waitOnCreateDb, parsedLocation, tblName, dbName, parsedPartSpec, locn, x); + (precursor != null), parsedLocation, tblName, dbName, parsedPartSpec, locn, x); if (precursor != null) { for (Task t : importTasks) { precursor.addDependentTask(t); + LOG.debug("Added {}:{} as a precursor of {}:{}", + precursor.getClass(), precursor.getId(), t.getClass(), t.getId()); } } + return importTasks; } catch (Exception e) { throw new SemanticException(e); } @@ -655,8 +688,8 @@ private void analyzeReplStatus(ASTNode ast) throws SemanticException { if (tbl != null) { inputs.add(new ReadEntity(tbl)); Map params = tbl.getParameters(); - if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) { - replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID); + if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { + replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); } } } else { @@ -665,8 +698,8 @@ private void analyzeReplStatus(ASTNode ast) throws SemanticException { if (database != null) { inputs.add(new ReadEntity(database)); Map params = database.getParameters(); - if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) { - replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID); + if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { + replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); } } } @@ -675,9 +708,9 @@ private void analyzeReplStatus(ASTNode ast) throws SemanticException { // codes } - LOG.debug("RSTATUS: writing repl.last.id=" + String.valueOf(replLastId) + " out to " - + ctx.getResFile()); prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string"); + LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}" , + String.valueOf(replLastId),ctx.getResFile()); } private void prepareReturnValues(List values, String schema) throws SemanticException {