diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index bbfbc36..df423f0 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -439,7 +439,7 @@ public void remove() { public void onInsert(InsertEvent insertEvent) throws MetaException { NotificationEvent event = new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage( - insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), + insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), insertEvent.isReplace(), new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums())) .toString()); event.setDbName(insertEvent.getDb()); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index aa2123e..ec238d2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -893,13 +893,81 @@ public void testIncrementalInserts() throws IOException { run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); String[] unptn_data = new String[] { "eleven", "twelve" }; + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')"); - verifyRun("SELECT a from " + dbName + ".unptned", unptn_data); + verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data); run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned"); run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned"); - verifyRun("SELECT * from " + dbName + ".unptned_late", unptn_data); + verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data); + verifyRun("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data); + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data); + verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data); + + String[] unptn_data_after_ins = new String[] { "eleven", "thirteen", "twelve" }; + String[] data_after_ovwrite = new String[] { "hundred" }; + run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[1] + "')"); + verifySetup("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data_after_ins); + run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')"); + verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + + verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data_after_ins); + + // Commenting the below verifications for the replication of insert overwrites until HIVE-15642 patch is in + //verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite); + } + + @Test + public void testIncrementalInsertToPartition() throws IOException { + String testName = "incrementalInsertToPartition"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" }; + String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" }; + + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[2] + "')"); + + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[2] + "')"); + verifySetup("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2); advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId); @@ -910,14 +978,14 @@ public void testIncrementalInserts() throws IOException { run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); - verifyRun("SELECT a from " + dbName + ".unptned", unptn_data); - verifyRun("SELECT a from " + dbName + ".unptned_late", unptn_data); - verifyRun("SELECT a from " + dbName + "_dupe.unptned", unptn_data); - verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data); + verifyRun("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2); - String[] unptn_data_after_ins = new String[] { "eleven", "twelve", "thirteen" }; - run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[2] + "')"); - verifySetup("SELECT a from " + dbName + ".unptned_late", unptn_data_after_ins); + String[] data_after_ovwrite = new String[] { "hundred" }; + run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=2) values('" + data_after_ovwrite[0] + "')"); + verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite); advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId); @@ -929,7 +997,8 @@ public void testIncrementalInserts() throws IOException { printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); - verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data_after_ins); + // Commenting the below verifications for the replication of insert overwrites until HIVE-15642 patch is in + //verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite); } @Test diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift index d056498..a1bdc30 100755 --- a/metastore/if/hive_metastore.thrift +++ b/metastore/if/hive_metastore.thrift @@ -812,9 +812,10 @@ struct CurrentNotificationEventId { } struct InsertEventRequestData { - 1: required list filesAdded, + 1: optional bool replace, + 2: required list filesAdded, // Checksum of files (hex string of checksum byte payload) - 2: optional list filesAddedChecksum, + 3: optional list filesAddedChecksum, } union FireEventRequestData { diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index be8429e..b38e1cb 100644 --- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -16126,6 +16126,11 @@ InsertEventRequestData::~InsertEventRequestData() throw() { } +void InsertEventRequestData::__set_replace(const bool val) { + this->replace = val; +__isset.replace = true; +} + void InsertEventRequestData::__set_filesAdded(const std::vector & val) { this->filesAdded = val; } @@ -16158,6 +16163,14 @@ uint32_t InsertEventRequestData::read(::apache::thrift::protocol::TProtocol* ipr switch (fid) { case 1: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->replace); + this->__isset.replace = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: if (ftype == ::apache::thrift::protocol::T_LIST) { { this->filesAdded.clear(); @@ -16177,7 +16190,7 @@ uint32_t InsertEventRequestData::read(::apache::thrift::protocol::TProtocol* ipr xfer += iprot->skip(ftype); } break; - case 2: + case 3: if (ftype == ::apache::thrift::protocol::T_LIST) { { this->filesAddedChecksum.clear(); @@ -16216,7 +16229,12 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); xfer += oprot->writeStructBegin("InsertEventRequestData"); - xfer += oprot->writeFieldBegin("filesAdded", ::apache::thrift::protocol::T_LIST, 1); + if (this->__isset.replace) { + xfer += oprot->writeFieldBegin("replace", ::apache::thrift::protocol::T_BOOL, 1); + xfer += oprot->writeBool(this->replace); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldBegin("filesAdded", ::apache::thrift::protocol::T_LIST, 2); { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast(this->filesAdded.size())); std::vector ::const_iterator _iter661; @@ -16229,7 +16247,7 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op xfer += oprot->writeFieldEnd(); if (this->__isset.filesAddedChecksum) { - xfer += oprot->writeFieldBegin("filesAddedChecksum", ::apache::thrift::protocol::T_LIST, 2); + xfer += oprot->writeFieldBegin("filesAddedChecksum", ::apache::thrift::protocol::T_LIST, 3); { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast(this->filesAddedChecksum.size())); std::vector ::const_iterator _iter662; @@ -16248,17 +16266,20 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op void swap(InsertEventRequestData &a, InsertEventRequestData &b) { using ::std::swap; + swap(a.replace, b.replace); swap(a.filesAdded, b.filesAdded); swap(a.filesAddedChecksum, b.filesAddedChecksum); swap(a.__isset, b.__isset); } InsertEventRequestData::InsertEventRequestData(const InsertEventRequestData& other663) { + replace = other663.replace; filesAdded = other663.filesAdded; filesAddedChecksum = other663.filesAddedChecksum; __isset = other663.__isset; } InsertEventRequestData& InsertEventRequestData::operator=(const InsertEventRequestData& other664) { + replace = other664.replace; filesAdded = other664.filesAdded; filesAddedChecksum = other664.filesAddedChecksum; __isset = other664.__isset; @@ -16267,7 +16288,8 @@ InsertEventRequestData& InsertEventRequestData::operator=(const InsertEventReque void InsertEventRequestData::printTo(std::ostream& out) const { using ::apache::thrift::to_string; out << "InsertEventRequestData("; - out << "filesAdded=" << to_string(filesAdded); + out << "replace="; (__isset.replace ? (out << to_string(replace)) : (out << "")); + out << ", " << "filesAdded=" << to_string(filesAdded); out << ", " << "filesAddedChecksum="; (__isset.filesAddedChecksum ? (out << to_string(filesAddedChecksum)) : (out << "")); out << ")"; } diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h index e73333a..50c61a7 100644 --- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -6555,7 +6555,8 @@ inline std::ostream& operator<<(std::ostream& out, const CurrentNotificationEven } typedef struct _InsertEventRequestData__isset { - _InsertEventRequestData__isset() : filesAddedChecksum(false) {} + _InsertEventRequestData__isset() : replace(false), filesAddedChecksum(false) {} + bool replace :1; bool filesAddedChecksum :1; } _InsertEventRequestData__isset; @@ -6564,21 +6565,28 @@ class InsertEventRequestData { InsertEventRequestData(const InsertEventRequestData&); InsertEventRequestData& operator=(const InsertEventRequestData&); - InsertEventRequestData() { + InsertEventRequestData() : replace(0) { } virtual ~InsertEventRequestData() throw(); + bool replace; std::vector filesAdded; std::vector filesAddedChecksum; _InsertEventRequestData__isset __isset; + void __set_replace(const bool val); + void __set_filesAdded(const std::vector & val); void __set_filesAddedChecksum(const std::vector & val); bool operator == (const InsertEventRequestData & rhs) const { + if (__isset.replace != rhs.__isset.replace) + return false; + else if (__isset.replace && !(replace == rhs.replace)) + return false; if (!(filesAdded == rhs.filesAdded)) return false; if (__isset.filesAddedChecksum != rhs.__isset.filesAddedChecksum) diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java index fd1dc06..354e634 100644 --- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java +++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java @@ -38,8 +38,9 @@ public class InsertEventRequestData implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InsertEventRequestData"); - private static final org.apache.thrift.protocol.TField FILES_ADDED_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAdded", org.apache.thrift.protocol.TType.LIST, (short)1); - private static final org.apache.thrift.protocol.TField FILES_ADDED_CHECKSUM_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAddedChecksum", org.apache.thrift.protocol.TType.LIST, (short)2); + private static final org.apache.thrift.protocol.TField REPLACE_FIELD_DESC = new org.apache.thrift.protocol.TField("replace", org.apache.thrift.protocol.TType.BOOL, (short)1); + private static final org.apache.thrift.protocol.TField FILES_ADDED_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAdded", org.apache.thrift.protocol.TType.LIST, (short)2); + private static final org.apache.thrift.protocol.TField FILES_ADDED_CHECKSUM_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAddedChecksum", org.apache.thrift.protocol.TType.LIST, (short)3); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -47,13 +48,15 @@ schemes.put(TupleScheme.class, new InsertEventRequestDataTupleSchemeFactory()); } + private boolean replace; // optional private List filesAdded; // required private List filesAddedChecksum; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { - FILES_ADDED((short)1, "filesAdded"), - FILES_ADDED_CHECKSUM((short)2, "filesAddedChecksum"); + REPLACE((short)1, "replace"), + FILES_ADDED((short)2, "filesAdded"), + FILES_ADDED_CHECKSUM((short)3, "filesAddedChecksum"); private static final Map byName = new HashMap(); @@ -68,9 +71,11 @@ */ public static _Fields findByThriftId(int fieldId) { switch(fieldId) { - case 1: // FILES_ADDED + case 1: // REPLACE + return REPLACE; + case 2: // FILES_ADDED return FILES_ADDED; - case 2: // FILES_ADDED_CHECKSUM + case 3: // FILES_ADDED_CHECKSUM return FILES_ADDED_CHECKSUM; default: return null; @@ -112,10 +117,14 @@ public String getFieldName() { } // isset id assignments - private static final _Fields optionals[] = {_Fields.FILES_ADDED_CHECKSUM}; + private static final int __REPLACE_ISSET_ID = 0; + private byte __isset_bitfield = 0; + private static final _Fields optionals[] = {_Fields.REPLACE,_Fields.FILES_ADDED_CHECKSUM}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.REPLACE, new org.apache.thrift.meta_data.FieldMetaData("replace", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.FILES_ADDED, new org.apache.thrift.meta_data.FieldMetaData("filesAdded", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); @@ -140,6 +149,8 @@ public InsertEventRequestData( * Performs a deep copy on other. */ public InsertEventRequestData(InsertEventRequestData other) { + __isset_bitfield = other.__isset_bitfield; + this.replace = other.replace; if (other.isSetFilesAdded()) { List __this__filesAdded = new ArrayList(other.filesAdded); this.filesAdded = __this__filesAdded; @@ -156,10 +167,34 @@ public InsertEventRequestData deepCopy() { @Override public void clear() { + setReplaceIsSet(false); + this.replace = false; this.filesAdded = null; this.filesAddedChecksum = null; } + public boolean isReplace() { + return this.replace; + } + + public void setReplace(boolean replace) { + this.replace = replace; + setReplaceIsSet(true); + } + + public void unsetReplace() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __REPLACE_ISSET_ID); + } + + /** Returns true if field replace is set (has been assigned a value) and false otherwise */ + public boolean isSetReplace() { + return EncodingUtils.testBit(__isset_bitfield, __REPLACE_ISSET_ID); + } + + public void setReplaceIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __REPLACE_ISSET_ID, value); + } + public int getFilesAddedSize() { return (this.filesAdded == null) ? 0 : this.filesAdded.size(); } @@ -238,6 +273,14 @@ public void setFilesAddedChecksumIsSet(boolean value) { public void setFieldValue(_Fields field, Object value) { switch (field) { + case REPLACE: + if (value == null) { + unsetReplace(); + } else { + setReplace((Boolean)value); + } + break; + case FILES_ADDED: if (value == null) { unsetFilesAdded(); @@ -259,6 +302,9 @@ public void setFieldValue(_Fields field, Object value) { public Object getFieldValue(_Fields field) { switch (field) { + case REPLACE: + return isReplace(); + case FILES_ADDED: return getFilesAdded(); @@ -276,6 +322,8 @@ public boolean isSet(_Fields field) { } switch (field) { + case REPLACE: + return isSetReplace(); case FILES_ADDED: return isSetFilesAdded(); case FILES_ADDED_CHECKSUM: @@ -297,6 +345,15 @@ public boolean equals(InsertEventRequestData that) { if (that == null) return false; + boolean this_present_replace = true && this.isSetReplace(); + boolean that_present_replace = true && that.isSetReplace(); + if (this_present_replace || that_present_replace) { + if (!(this_present_replace && that_present_replace)) + return false; + if (this.replace != that.replace) + return false; + } + boolean this_present_filesAdded = true && this.isSetFilesAdded(); boolean that_present_filesAdded = true && that.isSetFilesAdded(); if (this_present_filesAdded || that_present_filesAdded) { @@ -322,6 +379,11 @@ public boolean equals(InsertEventRequestData that) { public int hashCode() { List list = new ArrayList(); + boolean present_replace = true && (isSetReplace()); + list.add(present_replace); + if (present_replace) + list.add(replace); + boolean present_filesAdded = true && (isSetFilesAdded()); list.add(present_filesAdded); if (present_filesAdded) @@ -343,6 +405,16 @@ public int compareTo(InsertEventRequestData other) { int lastComparison = 0; + lastComparison = Boolean.valueOf(isSetReplace()).compareTo(other.isSetReplace()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetReplace()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.replace, other.replace); + if (lastComparison != 0) { + return lastComparison; + } + } lastComparison = Boolean.valueOf(isSetFilesAdded()).compareTo(other.isSetFilesAdded()); if (lastComparison != 0) { return lastComparison; @@ -383,6 +455,12 @@ public String toString() { StringBuilder sb = new StringBuilder("InsertEventRequestData("); boolean first = true; + if (isSetReplace()) { + sb.append("replace:"); + sb.append(this.replace); + first = false; + } + if (!first) sb.append(", "); sb.append("filesAdded:"); if (this.filesAdded == null) { sb.append("null"); @@ -423,6 +501,8 @@ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOExcept private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -447,7 +527,15 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, InsertEventRequestD break; } switch (schemeField.id) { - case 1: // FILES_ADDED + case 1: // REPLACE + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.replace = iprot.readBool(); + struct.setReplaceIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // FILES_ADDED if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { { org.apache.thrift.protocol.TList _list558 = iprot.readListBegin(); @@ -465,7 +553,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, InsertEventRequestD org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 2: // FILES_ADDED_CHECKSUM + case 3: // FILES_ADDED_CHECKSUM if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { { org.apache.thrift.protocol.TList _list561 = iprot.readListBegin(); @@ -496,6 +584,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, InsertEventRequest struct.validate(); oprot.writeStructBegin(STRUCT_DESC); + if (struct.isSetReplace()) { + oprot.writeFieldBegin(REPLACE_FIELD_DESC); + oprot.writeBool(struct.replace); + oprot.writeFieldEnd(); + } if (struct.filesAdded != null) { oprot.writeFieldBegin(FILES_ADDED_FIELD_DESC); { @@ -547,10 +640,16 @@ public void write(org.apache.thrift.protocol.TProtocol prot, InsertEventRequestD } } BitSet optionals = new BitSet(); - if (struct.isSetFilesAddedChecksum()) { + if (struct.isSetReplace()) { optionals.set(0); } - oprot.writeBitSet(optionals, 1); + if (struct.isSetFilesAddedChecksum()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetReplace()) { + oprot.writeBool(struct.replace); + } if (struct.isSetFilesAddedChecksum()) { { oprot.writeI32(struct.filesAddedChecksum.size()); @@ -576,8 +675,12 @@ public void read(org.apache.thrift.protocol.TProtocol prot, InsertEventRequestDa } } struct.setFilesAddedIsSet(true); - BitSet incoming = iprot.readBitSet(1); + BitSet incoming = iprot.readBitSet(2); if (incoming.get(0)) { + struct.replace = iprot.readBool(); + struct.setReplaceIsSet(true); + } + if (incoming.get(1)) { { org.apache.thrift.protocol.TList _list571 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32()); struct.filesAddedChecksum = new ArrayList(_list571.size); diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php index 2dfa1a9..4a3bf85 100644 --- a/metastore/src/gen/thrift/gen-php/metastore/Types.php +++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -16007,6 +16007,10 @@ class InsertEventRequestData { static $_TSPEC; /** + * @var bool + */ + public $replace = null; + /** * @var string[] */ public $filesAdded = null; @@ -16019,6 +16023,10 @@ class InsertEventRequestData { if (!isset(self::$_TSPEC)) { self::$_TSPEC = array( 1 => array( + 'var' => 'replace', + 'type' => TType::BOOL, + ), + 2 => array( 'var' => 'filesAdded', 'type' => TType::LST, 'etype' => TType::STRING, @@ -16026,7 +16034,7 @@ class InsertEventRequestData { 'type' => TType::STRING, ), ), - 2 => array( + 3 => array( 'var' => 'filesAddedChecksum', 'type' => TType::LST, 'etype' => TType::STRING, @@ -16037,6 +16045,9 @@ class InsertEventRequestData { ); } if (is_array($vals)) { + if (isset($vals['replace'])) { + $this->replace = $vals['replace']; + } if (isset($vals['filesAdded'])) { $this->filesAdded = $vals['filesAdded']; } @@ -16066,6 +16077,13 @@ class InsertEventRequestData { switch ($fid) { case 1: + if ($ftype == TType::BOOL) { + $xfer += $input->readBool($this->replace); + } else { + $xfer += $input->skip($ftype); + } + break; + case 2: if ($ftype == TType::LST) { $this->filesAdded = array(); $_size495 = 0; @@ -16082,7 +16100,7 @@ class InsertEventRequestData { $xfer += $input->skip($ftype); } break; - case 2: + case 3: if ($ftype == TType::LST) { $this->filesAddedChecksum = array(); $_size501 = 0; @@ -16112,11 +16130,16 @@ class InsertEventRequestData { public function write($output) { $xfer = 0; $xfer += $output->writeStructBegin('InsertEventRequestData'); + if ($this->replace !== null) { + $xfer += $output->writeFieldBegin('replace', TType::BOOL, 1); + $xfer += $output->writeBool($this->replace); + $xfer += $output->writeFieldEnd(); + } if ($this->filesAdded !== null) { if (!is_array($this->filesAdded)) { throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); } - $xfer += $output->writeFieldBegin('filesAdded', TType::LST, 1); + $xfer += $output->writeFieldBegin('filesAdded', TType::LST, 2); { $output->writeListBegin(TType::STRING, count($this->filesAdded)); { @@ -16133,7 +16156,7 @@ class InsertEventRequestData { if (!is_array($this->filesAddedChecksum)) { throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); } - $xfer += $output->writeFieldBegin('filesAddedChecksum', TType::LST, 2); + $xfer += $output->writeFieldBegin('filesAddedChecksum', TType::LST, 3); { $output->writeListBegin(TType::STRING, count($this->filesAddedChecksum)); { diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 3faf1bb..9480c85 100644 --- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -11156,17 +11156,20 @@ def __ne__(self, other): class InsertEventRequestData: """ Attributes: + - replace - filesAdded - filesAddedChecksum """ thrift_spec = ( None, # 0 - (1, TType.LIST, 'filesAdded', (TType.STRING,None), None, ), # 1 - (2, TType.LIST, 'filesAddedChecksum', (TType.STRING,None), None, ), # 2 + (1, TType.BOOL, 'replace', None, None, ), # 1 + (2, TType.LIST, 'filesAdded', (TType.STRING,None), None, ), # 2 + (3, TType.LIST, 'filesAddedChecksum', (TType.STRING,None), None, ), # 3 ) - def __init__(self, filesAdded=None, filesAddedChecksum=None,): + def __init__(self, replace=None, filesAdded=None, filesAddedChecksum=None,): + self.replace = replace self.filesAdded = filesAdded self.filesAddedChecksum = filesAddedChecksum @@ -11180,6 +11183,11 @@ def read(self, iprot): if ftype == TType.STOP: break if fid == 1: + if ftype == TType.BOOL: + self.replace = iprot.readBool() + else: + iprot.skip(ftype) + elif fid == 2: if ftype == TType.LIST: self.filesAdded = [] (_etype495, _size492) = iprot.readListBegin() @@ -11189,7 +11197,7 @@ def read(self, iprot): iprot.readListEnd() else: iprot.skip(ftype) - elif fid == 2: + elif fid == 3: if ftype == TType.LIST: self.filesAddedChecksum = [] (_etype501, _size498) = iprot.readListBegin() @@ -11209,15 +11217,19 @@ def write(self, oprot): oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) return oprot.writeStructBegin('InsertEventRequestData') + if self.replace is not None: + oprot.writeFieldBegin('replace', TType.BOOL, 1) + oprot.writeBool(self.replace) + oprot.writeFieldEnd() if self.filesAdded is not None: - oprot.writeFieldBegin('filesAdded', TType.LIST, 1) + oprot.writeFieldBegin('filesAdded', TType.LIST, 2) oprot.writeListBegin(TType.STRING, len(self.filesAdded)) for iter504 in self.filesAdded: oprot.writeString(iter504) oprot.writeListEnd() oprot.writeFieldEnd() if self.filesAddedChecksum is not None: - oprot.writeFieldBegin('filesAddedChecksum', TType.LIST, 2) + oprot.writeFieldBegin('filesAddedChecksum', TType.LIST, 3) oprot.writeListBegin(TType.STRING, len(self.filesAddedChecksum)) for iter505 in self.filesAddedChecksum: oprot.writeString(iter505) @@ -11234,6 +11246,7 @@ def validate(self): def __hash__(self): value = 17 + value = (value * 31) ^ hash(self.replace) value = (value * 31) ^ hash(self.filesAdded) value = (value * 31) ^ hash(self.filesAddedChecksum) return value diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index 5342451..7766071 100644 --- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2503,10 +2503,12 @@ end class InsertEventRequestData include ::Thrift::Struct, ::Thrift::Struct_Union - FILESADDED = 1 - FILESADDEDCHECKSUM = 2 + REPLACE = 1 + FILESADDED = 2 + FILESADDEDCHECKSUM = 3 FIELDS = { + REPLACE => {:type => ::Thrift::Types::BOOL, :name => 'replace', :optional => true}, FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', :element => {:type => ::Thrift::Types::STRING}}, FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 'filesAddedChecksum', :element => {:type => ::Thrift::Types::STRING}, :optional => true} } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java index 7bc0e04..dff1195 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java @@ -38,6 +38,7 @@ private final String db; private final String table; private final Map keyValues; + private final boolean replace; private final List files; private List fileChecksums = new ArrayList(); @@ -56,6 +57,9 @@ public InsertEvent(String db, String table, List partVals, super(status, handler); this.db = db; this.table = table; + + // If replace flag is not set by caller, then by default set it to true to maintain backward compatibility + this.replace = (insertData.isSetReplace() ? insertData.isReplace() : true); this.files = insertData.getFilesAdded(); GetTableRequest req = new GetTableRequest(db, table); req.setCapabilities(HiveMetaStoreClient.TEST_VERSION); @@ -90,6 +94,13 @@ public String getTable() { } /** + * @return The replace flag. + */ + public boolean isReplace() { + return replace; + } + + /** * Get list of files created as a result of this DML operation * * @return list of new files diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java index 3d16721..6d146e0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java @@ -37,6 +37,12 @@ protected InsertMessage() { public abstract String getTable(); /** + * Getter for the replace flag being insert into/overwrite + * @return Replace flag to represent INSERT INTO or INSERT OVERWRITE (Boolean). + */ + public abstract boolean isReplace(); + + /** * Get the map of partition keyvalues. Will be null if this insert is to a table and not a * partition. * @return Map of partition keyvalues, or null. diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java index aa770f2..1ed7cc5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -231,9 +231,10 @@ public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Pa * @param table Name of the table the insert occurred in * @param partVals Partition values for the partition that the insert occurred in, may be null if * the insert was done into a non-partitioned table + * @param replace Flag to represent if INSERT OVERWRITE or INSERT INTO * @param files Iterator of file created * @return instance of InsertMessage */ public abstract InsertMessage buildInsertMessage(String db, String table, - Map partVals, Iterator files); + Map partVals, boolean replace, Iterator files); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java index e1316a4..c059d47 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java @@ -40,6 +40,9 @@ Long timestamp; @JsonProperty + String replace; + + @JsonProperty List files; @JsonProperty @@ -52,12 +55,13 @@ public JSONInsertMessage() { } public JSONInsertMessage(String server, String servicePrincipal, String db, String table, - Map partKeyVals, Iterator fileIter, Long timestamp) { + Map partKeyVals, boolean replace, Iterator fileIter, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; this.table = table; this.timestamp = timestamp; + this.replace = Boolean.toString(replace); this.partKeyVals = partKeyVals; this.files = Lists.newArrayList(fileIter); checkValid(); @@ -99,6 +103,9 @@ public Long getTimestamp() { } @Override + public boolean isReplace() { return Boolean.parseBoolean(replace); } + + @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index 3406afb..bb81949 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -161,10 +161,9 @@ public AlterIndexMessage buildAlterIndexMessage(Index before, Index after) { } @Override - public InsertMessage buildInsertMessage(String db, String table, Map partKeyVals, + public InsertMessage buildInsertMessage(String db, String table, Map partKeyVals, boolean replace, Iterator fileIter) { - return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, - fileIter, now()); + return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, replace, fileIter, now()); } private long now() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 6deea96..c66bbdf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1662,7 +1662,7 @@ public Partition loadPartition(Path loadPath, Table tbl, alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); validatePartition(newTPart); if ((null != newFiles) || replace) { - fireInsertEvent(tbl, partSpec, newFiles); + fireInsertEvent(tbl, partSpec, replace, newFiles); } else { LOG.debug("No new files were created, and is not a replace. Skipping generating INSERT event."); } @@ -2056,7 +2056,7 @@ public void loadTable(Path loadPath, String tableName, boolean replace, boolean throw new HiveException(e); } - fireInsertEvent(tbl, null, newFiles); + fireInsertEvent(tbl, null, replace, newFiles); } /** @@ -2281,7 +2281,7 @@ public Partition getPartition(Table tbl, Map partSpec, } else { alterPartitionSpec(tbl, partSpec, tpart, inheritTableSpecs, partPath); - fireInsertEvent(tbl, partSpec, newFiles); + fireInsertEvent(tbl, partSpec, true, newFiles); } } if (tpart == null) { @@ -2331,7 +2331,7 @@ private void alterPartitionSpecInMemory(Table tbl, tpart.getSd().setLocation(partPath); } - private void fireInsertEvent(Table tbl, Map partitionSpec, List newFiles) + private void fireInsertEvent(Table tbl, Map partitionSpec, boolean replace, List newFiles) throws HiveException { if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) { LOG.debug("Firing dml insert event"); @@ -2343,6 +2343,7 @@ private void fireInsertEvent(Table tbl, Map partitionSpec, List< FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf); FireEventRequestData data = new FireEventRequestData(); InsertEventRequestData insertData = new InsertEventRequestData(); + insertData.setReplace(replace); data.setInsertData(insertData); if (newFiles != null && newFiles.size() > 0) { for (Path p : newFiles) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 245c483..3a1fc70 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -413,7 +413,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, x.getOutputs(), addPartitionDesc), x.getConf()); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, Utilities.getTableDesc(table), - partSpec.getPartSpec(), true); + partSpec.getPartSpec(), replicationSpec.isReplace()); loadTableWork.setInheritTableSpecs(false); Task loadPartTask = TaskFactory.get(new MoveWork( x.getInputs(), x.getOutputs(), loadTableWork, null, false), @@ -921,7 +921,7 @@ private static void createReplImportTasks( } if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into unless the event is insert-into - loadTable(fromURI, table, !replicationSpec.isInsert(), new Path(fromURI), replicationSpec, x); + loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), replicationSpec, x); } else { x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 3ac7746..a85ba42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -452,7 +452,6 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Ex getNewEventOnlyReplicationSpec(ev.getEventId()) ); EventHandlerFactory.handlerFor(ev).handle(context); - } public static void injectNextDumpDirForTest(String dumpdir){ @@ -1223,7 +1222,7 @@ private ReplicationSpec getNewReplicationSpec() throws SemanticException { // Use for specifying object state as well as event state private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException { - return new ReplicationSpec(true, false, evState, objState, false, true, false); + return new ReplicationSpec(true, false, evState, objState, false, true, true); } // Use for replication states focused on event only, where the obj state will be the event state diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 48362a3..1ea608b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -44,7 +44,7 @@ private String currStateId = null; private boolean isNoop = false; private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in. - private boolean isInsert = false; // default is that the import mode is replace-into + private boolean isReplace = true; // default is that the import mode is insert overwrite // Key definitions related to replication public enum KEY { @@ -53,7 +53,7 @@ CURR_STATE_ID("repl.last.id"), NOOP("repl.noop"), LAZY("repl.lazy"), - IS_INSERT("repl.is.insert") + IS_REPLACE("repl.is.replace") ; private final String keyName; @@ -136,14 +136,14 @@ public ReplicationSpec(){ public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, String eventReplicationState, String currentReplicationState, - boolean isNoop, boolean isLazy, boolean isInsert) { + boolean isNoop, boolean isLazy, boolean isReplace) { this.isInReplicationScope = isInReplicationScope; this.isMetadataOnly = isMetadataOnly; this.eventId = eventReplicationState; this.currStateId = currentReplicationState; this.isNoop = isNoop; this.isLazy = isLazy; - this.isInsert = isInsert; + this.isReplace = isReplace; } public ReplicationSpec(Function keyFetcher) { @@ -162,7 +162,7 @@ public ReplicationSpec(Function keyFetcher) { this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString()); this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString())); this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString())); - this.isInsert = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_INSERT.toString())); + this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString())); } /** @@ -296,12 +296,12 @@ public void setIsMetadataOnly(boolean isMetadataOnly){ } /** - * @return true if this statement refers to insert-into operation. + * @return true if this statement refers to insert-into or insert-overwrite operation. */ - public boolean isInsert(){ return isInsert; } + public boolean isReplace(){ return isReplace; } - public void setIsInsert(boolean isInsert){ - this.isInsert = isInsert; + public void setIsReplace(boolean isReplace){ + this.isReplace = isReplace; } /** @@ -370,8 +370,8 @@ public String get(KEY key) { return String.valueOf(isNoop()); case LAZY: return String.valueOf(isLazy()); - case IS_INSERT: - return String.valueOf(isInsert()); + case IS_REPLACE: + return String.valueOf(isReplace()); } return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java index 1346276..e9f2a6a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java @@ -51,16 +51,30 @@ public void handle(Context withinContext) throws Exception { qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false)); } Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); - // Mark the replication type as insert into to avoid overwrite while import - withinContext.replicationSpec.setIsInsert(true); + + // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation + withinContext.replicationSpec.setIsReplace(insertMsg.isReplace()); EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, qlMdTable, qlPtns, withinContext.replicationSpec); Iterable files = insertMsg.getFiles(); if (files != null) { + Path dataPath; + if ((null == qlPtns) || qlPtns.isEmpty()) { + dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); + } else { + /* + * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple + * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list + * will have only one entry. + */ + assert(1 == qlPtns.size()); + dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName()); + } + // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext)) { + try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { fileListWriter.write(file + "\n"); } @@ -82,8 +96,7 @@ public void handle(Context withinContext) throws Exception { ); } - private BufferedWriter writer(Context withinContext) throws IOException { - Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); + private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));