diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 3ac5ba7..d2696be 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -21,11 +21,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.Shell; +import org.apache.thrift.TException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -41,6 +45,8 @@ import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; public class TestReplicationScenarios { @@ -54,6 +60,7 @@ static boolean useExternalMS = false; static int msPort; static Driver driver; + static HiveMetaStoreClient metaStoreClient; protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); private ArrayList lastResults; @@ -91,6 +98,7 @@ public static void setUpBeforeClass() throws Exception { driver = new Driver(hconf); SessionState.start(new CliSessionState(hconf)); + metaStoreClient = new HiveMetaStoreClient(hconf); } @AfterClass @@ -285,6 +293,114 @@ public void testIncrementalAdds() throws IOException { verifyResults(ptn_data_1); run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2"); verifyResults(ptn_data_2); + + } + + @Test + public void testDrops() throws IOException { + + String testName = "drops"; + LOG.info("Testing "+testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE"); + + String[] unptn_data = new String[]{ "eleven" , "twelve" }; + String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; + String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; + String[] empty = new String[]{}; + + String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); + + createTestDataFile(unptn_locn, unptn_data); + createTestDataFile(ptn_locn_1, ptn_data_1); + createTestDataFile(ptn_locn_2, ptn_data_2); + + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); + run("SELECT * from " + dbName + ".unptned"); + verifyResults(unptn_data); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')"); + run("SELECT a from " + dbName + ".ptned WHERE b='1'"); + verifyResults(ptn_data_1); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')"); + run("SELECT a from " + dbName + ".ptned WHERE b='2'"); + verifyResults(ptn_data_2); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')"); + run("SELECT a from " + dbName + ".ptned2 WHERE b='1'"); + verifyResults(ptn_data_1); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')"); + run("SELECT a from " + dbName + ".ptned2 WHERE b='2'"); + verifyResults(ptn_data_2); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0,0); + String replDumpId = getResult(0,1,true); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + run("REPL STATUS " + dbName + "_dupe"); + verifyResults(new String[] {replDumpId}); + + run("SELECT * from " + dbName + "_dupe.unptned"); + verifyResults(unptn_data); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'"); + verifyResults(ptn_data_1); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'"); + verifyResults(ptn_data_2); + run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'"); + verifyResults(ptn_data_1); + run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'"); + verifyResults(ptn_data_2); + + run("DROP TABLE " + dbName + ".unptned"); + run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')"); + run("DROP TABLE " + dbName + ".ptned2"); + run("SELECT a from " + dbName + ".ptned WHERE b=2"); + verifyResults(empty); + run("SELECT a from " + dbName + ".ptned"); + verifyResults(ptn_data_1); + + advanceDumpDir();; + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String postDropReplDumpLocn = getResult(0,0); + String postDropReplDumpId = getResult(0,1,true); + LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'"); + + Exception e = null; + try { + Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned"); + assertNull(tbl); + } catch (TException te) { + e = te; + } + assertNotNull(e); + assertEquals(NoSuchObjectException.class, e.getClass()); + + run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); + verifyResults(empty); + run("SELECT a from " + dbName + "_dupe.ptned"); + verifyResults(ptn_data_1); + + Exception e2 = null; + try { + Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2"); + assertNull(tbl); + } catch (TException te) { + e2 = te; + } + assertNotNull(e2); + assertEquals(NoSuchObjectException.class, e.getClass()); + } private String getResult(int rowNum, int colNum) throws IOException { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index c57f577..17e7686 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -221,7 +221,11 @@ static String createIndexObjJson(Index indexObj) throws TException { } public static ObjectNode getJsonTree(NotificationEvent event) throws Exception { - JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage()); + return getJsonTree(event.getMessage()); + } + + public static ObjectNode getJsonTree(String eventMessage) throws Exception { + JsonParser jsonParser = (new JsonFactory()).createJsonParser(eventMessage); ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(jsonParser, ObjectNode.class); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 3f58130..104b397 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -3132,7 +3132,7 @@ private void analyzeMetastoreCheck(CommonTree ast) throws SemanticException { return result; } - private static ExprNodeGenericFuncDesc makeBinaryPredicate( + public static ExprNodeGenericFuncDesc makeBinaryPredicate( String fn, ExprNodeDesc left, ExprNodeDesc right) throws SemanticException { return new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, FunctionRegistry.getFunctionInfo(fn).getGenericUDF(), Lists.newArrayList(left, right)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 8725015..69ccda7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -27,13 +27,17 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -45,18 +49,28 @@ import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.IOUtils; import javax.annotation.Nullable; +import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStreamReader; import java.io.Serializable; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -77,6 +91,122 @@ private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; + public static final String DUMPMETADATA = "_dumpmetadata"; + + public enum DUMPTYPE { + BOOTSTRAP("BOOTSTRAP"), + INCREMENTAL("INCREMENTAL"), + EVENT_CREATE_TABLE("EVENT_CREATE_TABLE"), + EVENT_ADD_PARTITION("EVENT_ADD_PARTITION"), + EVENT_DROP_TABLE("EVENT_DROP_TABLE"), + EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"), + EVENT_UNKNOWN("EVENT_UNKNOWN"); + + String type = null; + DUMPTYPE(String type) { + this.type = type; + } + + @Override + public String toString(){ + return type; + } + + }; + + public class DumpMetaData { + // wrapper class for reading and writing metadata about a dump + // responsible for _dumpmetadata files + + private DUMPTYPE dumpType; + private Long eventFrom = null; + private Long eventTo = null; + private String payload = null; + private boolean initialized = false; + + private final Path dumpRoot; + private final Path dumpFile; + + public DumpMetaData(Path dumpRoot) { + this.dumpRoot = dumpRoot; + dumpFile = new Path(dumpRoot, DUMPMETADATA); + } + + public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo){ + this(dumpRoot); + setDump(lvl,eventFrom,eventTo); + } + + public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo){ + this.dumpType = lvl; + this.eventFrom = eventFrom; + this.eventTo = eventTo; + this.initialized = true; + } + + public void loadDumpFromFile() throws SemanticException { + try { + // read from dumpfile and instantiate self + FileSystem fs = dumpFile.getFileSystem(conf); + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); + String line = null; + if ( (line = br.readLine()) != null){ + String[] lineContents = line.split("\t",4); + setDump(DUMPTYPE.valueOf(lineContents[0]),Long.valueOf(lineContents[1]),Long.valueOf(lineContents[2])); + setPayload(lineContents[3].equals(Utilities.nullStringOutput) ? null : lineContents[3]); + } else { + throw new IOException("Unable to read valid values from dumpFile:"+dumpFile.toUri().toString()); + } + } catch (IOException ioe){ + throw new SemanticException(ioe); + } + } + + public DUMPTYPE getDumpType() throws SemanticException { + initializeIfNot(); + return this.dumpType; + } + + public String getPayload() throws SemanticException { + initializeIfNot(); + return this.payload; + } + + public void setPayload(String payload) { + this.payload = payload; + } + + public Long getEventFrom() throws SemanticException { + initializeIfNot(); + return eventFrom; + } + + public Long getEventTo() throws SemanticException { + initializeIfNot(); + return eventTo; + } + + public Path getDumpFilePath() { + return dumpFile; + } + + public boolean isIncrementalDump() throws SemanticException { + initializeIfNot(); + return (this.dumpType == DUMPTYPE.INCREMENTAL); + } + + private void initializeIfNot() throws SemanticException { + if (!initialized){ + loadDumpFromFile(); + } + } + + public void write() throws SemanticException { + writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), eventTo.toString(), payload), dumpFile); + } + + } + public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); } @@ -155,13 +285,12 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { + String.valueOf(eventTo) + " batchsize " + String.valueOf(batchSize)); String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR); Path dumpRoot = new Path(replRoot, getNextDumpDir()); - Path dumpMetadata = new Path(dumpRoot,"_dumpmetadata"); - String lastReplId; + DumpMetaData dmd = new DumpMetaData(dumpRoot); + Long lastReplId; try { if (eventFrom == null){ // bootstrap case - String bootDumpBeginReplId = - String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId()); + Long bootDumpBeginReplId = db.getMSC().getCurrentNotificationEventId().getEventId(); for (String dbName : matchesDb(dbNameOrPattern)) { LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); Path dbRoot = dumpDbMetadata(dbName, dumpRoot); @@ -171,8 +300,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { dumpTbl(ast, dbName, tblName, dbRoot); } } - String bootDumpEndReplId = - String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId()); + Long bootDumpEndReplId = db.getMSC().getCurrentNotificationEventId().getEventId(); LOG.info("Bootstrap object dump phase took from {} to {}",bootDumpBeginReplId, bootDumpEndReplId); // Now that bootstrap has dumped all objects related, we have to account for the changes @@ -184,8 +312,8 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { EventUtils.MSClientNotificationFetcher evFetcher = new EventUtils.MSClientNotificationFetcher(db.getMSC()); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( - evFetcher, Long.valueOf(bootDumpBeginReplId), - Ints.checkedCast(Long.valueOf(bootDumpEndReplId) - Long.valueOf(bootDumpBeginReplId) + 1), + evFetcher, bootDumpBeginReplId, + Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1, evFilter ); // Now we consolidate all the events that happenned during the objdump into the objdump @@ -194,7 +322,10 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId())); // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot) } - LOG.info("Consolidation done, preparing to return {},{}",dumpRoot.toUri(),bootDumpEndReplId); + LOG.info("Consolidation done, preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); + dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId); + dmd.write(); + // Set the correct last repl id to return to the user lastReplId = bootDumpEndReplId; } else { @@ -230,13 +361,14 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { dumpEvent(ev,evRoot); } - LOG.info("Done dumping events, preparing to return {},{}",dumpRoot.toUri(),eventTo); - List vals; - writeOutput(Arrays.asList("event", String.valueOf(eventFrom), String.valueOf(eventTo)), dumpMetadata); + LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), eventTo); + writeOutput(Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(eventTo)), dmd.getDumpFilePath()); + dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo); + dmd.write(); // Set the correct last repl id to return to the user - lastReplId = String.valueOf(eventTo); + lastReplId = eventTo; } - prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), lastReplId), dumpSchema); + prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema); setFetchTask(createFetchTask(dumpSchema)); } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes @@ -248,7 +380,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception { long evid = ev.getEventId(); String evidStr = String.valueOf(evid); - ReplicationSpec replicationSpec = getNewReplicationSpec(evidStr, evidStr); + ReplicationSpec replicationSpec = getNewEventOnlyReplicationSpec(evidStr); MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); switch (ev.getEventType()){ case MessageFactory.CREATE_TABLE_EVENT : { @@ -278,7 +410,7 @@ private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception { // we will, however, do so here, now, for dev/debug's sake. Path dataPath = new Path(evRoot,"data"); rootTasks.add(ReplCopyTask.getDumpCopyTask(replicationSpec, qlMdTable.getPath(), dataPath , conf)); - + (new DumpMetaData(evRoot,DUMPTYPE.EVENT_CREATE_TABLE,evid,evid)).write(); break; } case MessageFactory.ADD_PARTITION_EVENT : { @@ -334,11 +466,29 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition replicationSpec, qlPtn.getPartitionPath(), ptnDataPath, conf)); } + (new DumpMetaData(evRoot,DUMPTYPE.EVENT_ADD_PARTITION,evid,evid)).write(); + break; + } + case MessageFactory.DROP_TABLE_EVENT : { + LOG.info("Processing#{} DROP_TABLE message : {}",ev.getEventId(),ev.getMessage()); + DumpMetaData dmd = new DumpMetaData(evRoot,DUMPTYPE.EVENT_DROP_TABLE,evid,evid); + dmd.setPayload(ev.getMessage()); + dmd.write(); + break; + } + case MessageFactory.DROP_PARTITION_EVENT : { + LOG.info("Processing#{} DROP_PARTITION message : {}",ev.getEventId(),ev.getMessage()); + DumpMetaData dmd = new DumpMetaData(evRoot,DUMPTYPE.EVENT_DROP_PARTITION,evid,evid); + dmd.setPayload(ev.getMessage()); + dmd.write(); break; } + // TODO : handle other event types default: - LOG.info("Skipping processing#{} message : {}",ev.getEventId(), ev.getMessage()); - // TODO : handle other event types + LOG.info("Dummy processing#{} message : {}",ev.getEventId(), ev.getMessage()); + DumpMetaData dmd = new DumpMetaData(evRoot,DUMPTYPE.EVENT_UNKNOWN,evid,evid); + dmd.setPayload(ev.getMessage()); + dmd.write(); break; } @@ -426,14 +576,44 @@ private void initReplLoad(ASTNode ast) { /* * Example dump dirs we need to be able to handle : * - * for: hive.repl.rootdir = staging/ Then, repl dumps will be created in staging/ + * for: hive.repl.rootdir = staging/ + * Then, repl dumps will be created in staging/ * - * single-db-dump: staging/blah12345 blah12345/ default/ _metadata tbl1/ _metadata dt=20160907/ - * _files tbl2/ tbl3/ unptn_tbl/ _metadata _files + * single-db-dump: staging/blah12345 will contain a db dir for the db specified + * blah12345/ + * default/ + * _metadata + * tbl1/ + * _metadata + * dt=20160907/ + * _files + * tbl2/ + * tbl3/ + * unptn_tbl/ + * _metadata + * _files * - * multi-db-dump: staging/bar12347 staging/ bar12347/ default/ ... sales/ ... + * multi-db-dump: staging/bar12347 will contain dirs for each db covered + * staging/ + * bar12347/ + * default/ + * ... + * sales/ + * ... * - * single table-dump: staging/baz123 staging/ baz123/ _metadata dt=20150931/ _files + * single table-dump: staging/baz123 will contain a table object dump inside + * staging/ + * baz123/ + * _metadata + * dt=20150931/ + * _files + * + * incremental dump : staging/blue123 will contain dirs for each event inside. + * staging/ + * blue123/ + * 34/ + * 35/ + * 36/ */ private void analyzeReplLoad(ASTNode ast) throws SemanticException { LOG.debug("ReplSemanticAnalyzer.analyzeReplLoad: " + String.valueOf(dbNameOrPattern) + "." @@ -458,22 +638,24 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // db name, and with a _metadata file in each, and table dirs inside that. // b) It can be a table dump dir, in which case we expect a _metadata dump of // a table in question in the dir, and individual ptn dir hierarchy. - // c) A dump can be an event-level dump, which means we have several subdirs + // c) A dump can be an incremental dump, which means we have several subdirs // each of which have the evid as the dir name, and each of which correspond // to a event-level dump. Currently, only CREATE_TABLE and ADD_PARTITION are // handled, so all of these dumps will be at a table/ptn level. - // For incremental repl, eventually, we can have individual events which can + // For incremental repl, we will have individual events which can // be other things like roles and fns as well. + // At this point, all dump dirs should contain a _dumpmetadata file that + // tells us what is inside that dumpdir. + + DumpMetaData dmd = new DumpMetaData(loadPath); boolean evDump = false; - Path dumpMetadata = new Path(loadPath, "_dumpmetadata"); - // TODO : only event dumps currently have _dumpmetadata - this might change. Generify. - if (fs.exists(dumpMetadata)){ - LOG.debug("{} exists, this is a event dump", dumpMetadata); + if (dmd.isIncrementalDump()){ + LOG.debug("{} contains an incremental dump", loadPath); evDump = true; } else { - LOG.debug("{} does not exist, this is an object dump", dumpMetadata); + LOG.debug("{} contains an bootstrap dump", loadPath); } if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { @@ -533,6 +715,9 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // \ / // --->ev1.task3-- // + // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the + // entire chain + List> evTasks = analyzeEventLoad( dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), taskChainTail); LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0); @@ -562,10 +747,98 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { private List> analyzeEventLoad( String dbName, String tblName, String locn, - Task precursor ) throws SemanticException { + Task precursor) throws SemanticException { // Currently handles only create-tbl & insert-ptn, since only those are dumped // As we add more event types, this will expand. - return analyzeTableLoad(dbName, tblName, locn, precursor); + DumpMetaData dmd = new DumpMetaData(new Path(locn)); + switch (dmd.getDumpType()) { + case EVENT_CREATE_TABLE: { + return analyzeTableLoad(dbName, tblName, locn, precursor); + } + case EVENT_ADD_PARTITION: { + return analyzeTableLoad(dbName, tblName, locn, precursor); + } + case EVENT_DROP_TABLE: { + MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); + DropTableMessage dropTableMessage = md.getDropTableMessage(dmd.getPayload()); + DropTableDesc dropTableDesc = new DropTableDesc( + dbName + "." + (tblName == null ? dropTableMessage.getTable() : tblName), + null, true, true, getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom()))); + Task dropTableTask = TaskFactory.get(new DDLWork(inputs, outputs, dropTableDesc), conf); + if (precursor != null){ + precursor.addDependentTask(dropTableTask); + } + List> tasks = new ArrayList>(); + tasks.add(dropTableTask); + LOG.debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()); + return tasks; + } + case EVENT_DROP_PARTITION: { + MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); + DropPartitionMessage dropPartitionMessage = md.getDropPartitionMessage(dmd.getPayload()); + Map> partSpecs = genPartSpecs(dropPartitionMessage.getPartitions()); + if (partSpecs.size() > 0){ + DropTableDesc dropPtnDesc = new DropTableDesc( + dbName + "." + (tblName == null ? dropPartitionMessage.getTable() : tblName), partSpecs, + null, true, getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom()))); + Task dropPtnTask = TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), conf); + if (precursor != null){ + precursor.addDependentTask(dropPtnTask); + } + List> tasks = new ArrayList>(); + tasks.add(dropPtnTask); + LOG.debug("Added drop ptn task : {}:{},{}", + dropPtnTask.getId(), dropPtnDesc.getTableName(), dropPartitionMessage.getPartitions()); + return tasks; + } else { + throw new SemanticException("DROP PARTITION EVENT does not return any part descs for event message :"+dmd.getPayload()); + } + } + case EVENT_UNKNOWN: { + break; + } + default: { + break; + } + } + return null; + } + + private Map> genPartSpecs(List> partitions) throws SemanticException { + Map> partSpecs = new HashMap>(); + + int partPrefixLength = 0; + if ((partitions != null) && (partitions.size() > 0)) { + partPrefixLength = partitions.get(0).size(); + // pick the length of the first ptn, we expect all ptns listed to have the same number of key-vals. + } + List ptnDescs = new ArrayList(); + for (Map ptn : partitions) { + // convert each key-value-map to appropriate expression. + + ExprNodeGenericFuncDesc expr = null; + for (Map.Entry kvp : ptn.entrySet()) { + String key = kvp.getKey(); + Object val = kvp.getValue(); + // FIXME : bug here, value is being placed as a String, but should actually be the underlying type + // as converted to it by looking at the table's col schema. To do that, however, we need the + // tableObjJson from the DropTableMessage. So, currently, this will only work for partitions for + // which the partition keys are all strings. So, for now, we hardcode it, but we need to fix this. + String type = "string"; + + PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); + ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); + ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate("=", column, new ExprNodeConstantDesc(pti, val)); + expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); + } + if (expr != null) { + ptnDescs.add(expr); + } + } + if (ptnDescs.size() > 0){ + partSpecs.put(partPrefixLength,ptnDescs); + } + return partSpecs; } private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) @@ -709,8 +982,8 @@ private void analyzeReplStatus(ASTNode ast) throws SemanticException { } prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string"); - LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}" , - String.valueOf(replLastId),ctx.getResFile()); + LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}", + String.valueOf(replLastId), ctx.getResFile()); } private void prepareReturnValues(List values, String schema) throws SemanticException { @@ -752,10 +1025,16 @@ private ReplicationSpec getNewReplicationSpec() throws SemanticException { } } + // Use for specifying object state as well as event state private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException { return new ReplicationSpec(true, false, evState, objState, false, true); } + // Use for replication states focussed on event only, where the obj state will be the event state + private ReplicationSpec getNewEventOnlyReplicationSpec(String evState) throws SemanticException { + return getNewReplicationSpec(evState,evState); + } + private Iterable matchesTbl(String dbName, String tblPattern) throws HiveException { if (tblPattern == null) {