diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 6f96e1d..e598a6b 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -437,13 +437,14 @@ public void remove() { } @Override public void onInsert(InsertEvent insertEvent) throws MetaException { + Table tableObj = insertEvent.getTableObj(); NotificationEvent event = - new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage( - insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), insertEvent.isReplace(), + new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(tableObj, + insertEvent.getPartitionObj(), insertEvent.isReplace(), new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums())) - .toString()); - event.setDbName(insertEvent.getDb()); - event.setTableName(insertEvent.getTable()); + .toString()); + event.setDbName(tableObj.getDbName()); + event.setTableName(tableObj.getTableName()); process(event, insertEvent); } diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 2168a67..808c9c7 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -1227,8 +1227,9 @@ public void insertPartition() throws Exception { FieldSchema partCol1 = new FieldSchema("ds", "string", "no comment"); List partCols = new ArrayList(); List partCol1Vals = Arrays.asList("today"); - LinkedHashMap partKeyVals = new LinkedHashMap(); - partKeyVals.put("ds", "today"); + List partKeyVals = new ArrayList(); + partKeyVals.add("today"); + partCols.add(partCol1); Table table = new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, partCols, @@ -1264,9 +1265,9 @@ public void insertPartition() throws Exception { // Parse the message field verifyInsert(event, defaultDbName, tblName); InsertMessage insertMessage = md.getInsertMessage(event.getMessage()); - Map partKeyValsFromNotif = insertMessage.getPartitionKeyValues(); + List ptnValues = insertMessage.getPtnObj().getValues(); - assertMapEquals(partKeyVals, partKeyValsFromNotif); + assertEquals(partKeyVals, ptnValues); // Verify the eventID was passed to the non-transactional listener MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 3); @@ -1528,31 +1529,16 @@ private void verifyInsert(NotificationEvent event, String dbName, String tblName InsertMessage insertMsg = md.getInsertMessage(event.getMessage()); System.out.println("InsertMessage: " + insertMsg.toString()); if (dbName != null ){ - assertEquals(dbName, insertMsg.getDB()); + assertEquals(dbName, insertMsg.getTableObj().getDbName()); } if (tblName != null){ - assertEquals(tblName, insertMsg.getTable()); + assertEquals(tblName, insertMsg.getTableObj().getTableName()); } // Should have files Iterator files = insertMsg.getFiles().iterator(); assertTrue(files.hasNext()); } - - private void assertMapEquals(Map map1, Map map2) { - // non ordered, non-classed map comparison - use sparingly instead of assertEquals - // only if you're sure that the order does not matter. - if ((map1 == null) || (map2 == null)){ - assertNull(map1); - assertNull(map2); - } - assertEquals(map1.size(),map2.size()); - for (String k : map1.keySet()){ - assertTrue(map2.containsKey(k)); - assertEquals(map1.get(k), map2.get(k)); - } - } - @Test public void cleanupNotifs() throws Exception { Database db = new Database("cleanup1", "no description", "file:/tmp", emptyParameters); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 21f09ae..766d858 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -1282,6 +1282,136 @@ public void testInsertToMultiKeyPartition() throws IOException { } @Test + public void testIncrementalInsertDropUnpartitionedTable() throws IOException { + String testName = "incrementalInsertDropUnpartitionedTable"; + String dbName = createDB(testName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + String[] unptn_data = new String[] { "eleven", "twelve" }; + + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')"); + verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data); + + run("CREATE TABLE " + dbName + ".unptned_tmp AS SELECT * FROM " + dbName + ".unptned"); + verifySetup("SELECT a from " + dbName + ".unptned_tmp ORDER BY a", unptn_data); + + // Get the last repl ID corresponding to all insert/alter/create events except DROP. + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String lastDumpIdWithoutDrop = getResult(0, 1); + + // Drop all the tables + run("DROP TABLE " + dbName + ".unptned"); + run("DROP TABLE " + dbName + ".unptned_tmp"); + verifyFail("SELECT * FROM " + dbName + ".unptned"); + verifyFail("SELECT * FROM " + dbName + ".unptned_tmp"); + + // Dump all the events except DROP + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + lastDumpIdWithoutDrop); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + + // Need to find the tables and data as drop is not part of this dump + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data); + verifyRun("SELECT a from " + dbName + "_dupe.unptned_tmp ORDER BY a", unptn_data); + + // Dump the drop events and check if tables are getting dropped in target as well + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyFail("SELECT * FROM " + dbName + ".unptned"); + verifyFail("SELECT * FROM " + dbName + ".unptned_tmp"); + } + + @Test + public void testIncrementalInsertDropPartitionedTable() throws IOException { + String testName = "incrementalInsertDropPartitionedTable"; + String dbName = createDB(testName); + run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" }; + String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" }; + + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[2] + "')"); + + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=20)"); + run("ALTER TABLE " + dbName + ".ptned RENAME PARTITION (b=20) TO PARTITION (b=2"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[2] + "')"); + verifySetup("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2); + + run("CREATE TABLE " + dbName + ".ptned_tmp AS SELECT * FROM " + dbName + ".ptned"); + verifySetup("SELECT a from " + dbName + ".ptned_tmp where (b=1) ORDER BY a", ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned_tmp where (b=2) ORDER BY a", ptn_data_2); + + // Get the last repl ID corresponding to all insert/alter/create events except DROP. + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String lastDumpIdWithoutDrop = getResult(0, 1); + + // Drop all the tables + run("DROP TABLE " + dbName + ".ptned_tmp"); + run("DROP TABLE " + dbName + ".ptned"); + verifyFail("SELECT * FROM " + dbName + ".ptned_tmp"); + verifyFail("SELECT * FROM " + dbName + ".ptned"); + + // Dump all the events except DROP + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + lastDumpIdWithoutDrop); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + + // Need to find the tables and data as drop is not part of this dump + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_tmp where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_tmp where (b=2) ORDER BY a", ptn_data_2); + + // Dump the drop events and check if tables are getting dropped in target as well + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyFail("SELECT * FROM " + dbName + ".ptned_tmp"); + verifyFail("SELECT * FROM " + dbName + ".ptned"); + } + + @Test public void testViewsReplication() throws IOException { String testName = "viewsReplication"; String dbName = createDB(testName); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java index dff1195..c33ade1 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java @@ -24,20 +24,16 @@ import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import java.util.ArrayList; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; public class InsertEvent extends ListenerEvent { - // Note that this event is fired from the client, so rather than having full metastore objects - // we have just the string names, but that's fine for what we need. - private final String db; - private final String table; - private final Map keyValues; + private final Table tableObj; + private final Partition ptnObj; private final boolean replace; private final List files; private List fileChecksums = new ArrayList(); @@ -55,42 +51,36 @@ public InsertEvent(String db, String table, List partVals, InsertEventRequestData insertData, boolean status, HMSHandler handler) throws MetaException, NoSuchObjectException { super(status, handler); - this.db = db; - this.table = table; - // If replace flag is not set by caller, then by default set it to true to maintain backward compatibility - this.replace = (insertData.isSetReplace() ? insertData.isReplace() : true); - this.files = insertData.getFilesAdded(); GetTableRequest req = new GetTableRequest(db, table); req.setCapabilities(HiveMetaStoreClient.TEST_VERSION); - Table t = handler.get_table_req(req).getTable(); - keyValues = new LinkedHashMap(); + this.tableObj = handler.get_table_req(req).getTable(); if (partVals != null) { - for (int i = 0; i < partVals.size(); i++) { - keyValues.put(t.getPartitionKeys().get(i).getName(), partVals.get(i)); - } + this.ptnObj = handler.get_partition(db, table, partVals); + } else { + this.ptnObj = null; } + + // If replace flag is not set by caller, then by default set it to true to maintain backward compatibility + this.replace = (insertData.isSetReplace() ? insertData.isReplace() : true); + this.files = insertData.getFilesAdded(); if (insertData.isSetFilesAddedChecksum()) { fileChecksums = insertData.getFilesAddedChecksum(); } } - public String getDb() { - return db; - } - /** - * @return The table. + * @return Table object */ - public String getTable() { - return table; + public Table getTableObj() { + return tableObj; } /** - * @return List of values for the partition keys. + * @return Partition object */ - public Map getPartitionKeyValues() { - return keyValues; + public Partition getPartitionObj() { + return ptnObj; } /** diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java index 6d146e0..6505c67 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java @@ -19,7 +19,8 @@ package org.apache.hadoop.hive.metastore.messaging; -import java.util.Map; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; /** * HCat message sent when an insert is done to a table or partition. @@ -43,19 +44,26 @@ protected InsertMessage() { public abstract boolean isReplace(); /** - * Get the map of partition keyvalues. Will be null if this insert is to a table and not a - * partition. - * @return Map of partition keyvalues, or null. - */ - public abstract Map getPartitionKeyValues(); - - /** * Get list of file name and checksum created as a result of this DML operation * * @return The iterable of files */ public abstract Iterable getFiles(); + /** + * Get the table object associated with the insert + * + * @return The Json format of Table object + */ + public abstract Table getTableObj() throws Exception; + + /** + * Get the partition object associated with the insert + * + * @return The Json format of Partition object if the table is partitioned else return null. + */ + public abstract Partition getPtnObj() throws Exception; + @Override public EventMessage checkValid() { if (getTable() == null) diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java index 1bd52a8..9437e8b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -29,7 +29,6 @@ import org.apache.hadoop.util.ReflectionUtils; import java.util.Iterator; -import java.util.Map; /** * Abstract Factory for the construction of HCatalog message instances. @@ -229,14 +228,13 @@ public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Pa /** * Factory method for building insert message * - * @param db Name of the database the insert occurred in - * @param table Name of the table the insert occurred in - * @param partVals Partition values for the partition that the insert occurred in, may be null if + * @param tableObj Table object where the insert occurred in + * @param ptnObj Partition object where the insert occurred in, may be null if * the insert was done into a non-partitioned table * @param replace Flag to represent if INSERT OVERWRITE or INSERT INTO * @param files Iterator of file created * @return instance of InsertMessage */ - public abstract InsertMessage buildInsertMessage(String db, String table, - Map partVals, boolean replace, Iterator files); + public abstract InsertMessage buildInsertMessage(Table tableObj, Partition ptnObj, + boolean replace, Iterator files); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java index c059d47..18a15f5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java @@ -19,14 +19,16 @@ package org.apache.hadoop.hive.metastore.messaging.json; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; import com.google.common.collect.Lists; import java.util.Iterator; import java.util.List; -import java.util.Map; /** * JSON implementation of InsertMessage @@ -34,7 +36,7 @@ public class JSONInsertMessage extends InsertMessage { @JsonProperty - String server, servicePrincipal, db, table; + String server, servicePrincipal, db, table, tableObjJson, ptnObjJson; @JsonProperty Long timestamp; @@ -45,25 +47,39 @@ @JsonProperty List files; - @JsonProperty - Map partKeyVals; - /** * Default constructor, needed for Jackson. */ public JSONInsertMessage() { } - public JSONInsertMessage(String server, String servicePrincipal, String db, String table, - Map partKeyVals, boolean replace, Iterator fileIter, Long timestamp) { + public JSONInsertMessage(String server, String servicePrincipal, Table tableObj, Partition ptnObj, + boolean replace, Iterator fileIter, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; - this.db = db; - this.table = table; + + if (null == tableObj) { + throw new IllegalArgumentException("Table not valid."); + } + + this.db = tableObj.getDbName(); + this.table = tableObj.getTableName(); + + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + if (null != ptnObj) { + this.ptnObjJson = JSONMessageFactory.createPartitionObjJson(ptnObj); + } else { + this.ptnObjJson = null; + } + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + this.timestamp = timestamp; this.replace = Boolean.toString(replace); - this.partKeyVals = partKeyVals; this.files = Lists.newArrayList(fileIter); + checkValid(); } @@ -78,11 +94,6 @@ public String getServer() { } @Override - public Map getPartitionKeyValues() { - return partKeyVals; - } - - @Override public Iterable getFiles() { return files; } @@ -106,6 +117,16 @@ public Long getTimestamp() { public boolean isReplace() { return Boolean.parseBoolean(replace); } @Override + public Table getTableObj() throws Exception { + return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class); + } + + @Override + public Partition getPtnObj() throws Exception { + return ((null == ptnObjJson) ? null : (Partition) JSONMessageFactory.getTObj(ptnObjJson, Partition.class)); + } + + @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index 04a4041..a4c31f2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -28,10 +28,6 @@ import com.google.common.collect.Iterables; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.Index; @@ -165,9 +161,9 @@ public AlterIndexMessage buildAlterIndexMessage(Index before, Index after) { } @Override - public InsertMessage buildInsertMessage(String db, String table, Map partKeyVals, boolean replace, - Iterator fileIter) { - return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, replace, fileIter, now()); + public InsertMessage buildInsertMessage(Table tableObj, Partition partObj, + boolean replace, Iterator fileIter) { + return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, tableObj, partObj, replace, fileIter, now()); } private long now() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index f514fb2..956bb08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -23,18 +23,15 @@ import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.thrift.TException; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.Collections; import java.util.List; -import java.util.Map; -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - -import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; class InsertHandler extends AbstractEventHandler { @@ -45,11 +42,10 @@ @Override public void handle(Context withinContext) throws Exception { InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage()); - org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(withinContext, insertMsg); - Map partSpec = insertMsg.getPartitionKeyValues(); + org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(insertMsg); List qlPtns = null; - if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) { - qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false)); + if (qlMdTable.isPartitioned() && (null != insertMsg.getPtnObj())) { + qlPtns = Collections.singletonList(partitionObject(qlMdTable, insertMsg)); } Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); @@ -88,13 +84,13 @@ public void handle(Context withinContext) throws Exception { dmd.write(); } - private org.apache.hadoop.hive.ql.metadata.Table tableObject( - Context withinContext, InsertMessage insertMsg) throws TException { - return new org.apache.hadoop.hive.ql.metadata.Table( - withinContext.db.getMSC().getTable( - insertMsg.getDB(), insertMsg.getTable() - ) - ); + private org.apache.hadoop.hive.ql.metadata.Table tableObject(InsertMessage insertMsg) throws Exception { + return new org.apache.hadoop.hive.ql.metadata.Table(insertMsg.getTableObj()); + } + + private org.apache.hadoop.hive.ql.metadata.Partition partitionObject( + org.apache.hadoop.hive.ql.metadata.Table qlMdTable, InsertMessage insertMsg) throws Exception { + return new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable, insertMsg.getPtnObj()); } private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {