diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 1360563..c9f130c 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -531,12 +531,15 @@ public void sqlInsertPartition() throws Exception { driver.run("insert into table sip partition (ds) values (3, 'tomorrow')"); driver.run("alter table sip drop partition (ds = 'tomorrow')"); + driver.run("insert into table sip partition (ds) values (42, 'todaytwo')"); + driver.run("insert overwrite table sip partition(ds='todaytwo') select c from sip where 'ds'='today'"); + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); for (NotificationEvent ne : rsp.getEvents()) LOG.debug("EVENT: " + ne.getMessage()); // For reasons not clear to me there's one or more alter partitions after add partition and // insert. - assertEquals(19, rsp.getEventsSize()); + assertEquals(24, rsp.getEventsSize()); NotificationEvent event = rsp.getEvents().get(1); assertEquals(firstEventId + 2, event.getEventId()); assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType()); @@ -566,7 +569,29 @@ public void sqlInsertPartition() throws Exception { event = rsp.getEvents().get(18); assertEquals(firstEventId + 19, event.getEventId()); assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, event.getEventType()); - } + + event = rsp.getEvents().get(19); + assertEquals(firstEventId + 20, event.getEventId()); + assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType()); + event = rsp.getEvents().get(20); + assertEquals(firstEventId + 21, event.getEventId()); + assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType()); + assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); + + event = rsp.getEvents().get(21); + assertEquals(firstEventId + 22, event.getEventId()); + assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType()); + assertTrue(event.getMessage().matches(".*\"files\":\\[\\].*")); // replace-overwrite introduces no new files + event = rsp.getEvents().get(22); + assertEquals(firstEventId + 23, event.getEventId()); + assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType()); + assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); + event = rsp.getEvents().get(23); + assertEquals(firstEventId + 24, event.getEventId()); + assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType()); + assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); + + } @Test public void cleanupNotifs() throws Exception { diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift index 4d92b73..db0abcb 100755 --- a/metastore/if/hive_metastore.thrift +++ b/metastore/if/hive_metastore.thrift @@ -800,7 +800,8 @@ struct CurrentNotificationEventId { } struct InsertEventRequestData { - 1: required list filesAdded + 1: required list filesAdded, + 2: optional bool overwrite } union FireEventRequestData { diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index 79460a8..cb8c99e 100644 --- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -15833,6 +15833,11 @@ void InsertEventRequestData::__set_filesAdded(const std::vector & v this->filesAdded = val; } +void InsertEventRequestData::__set_overwrite(const bool val) { + this->overwrite = val; +__isset.overwrite = true; +} + uint32_t InsertEventRequestData::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -15875,6 +15880,14 @@ uint32_t InsertEventRequestData::read(::apache::thrift::protocol::TProtocol* ipr xfer += iprot->skip(ftype); } break; + case 2: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->overwrite); + this->__isset.overwrite = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -15906,6 +15919,11 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op } xfer += oprot->writeFieldEnd(); + if (this->__isset.overwrite) { + xfer += oprot->writeFieldBegin("overwrite", ::apache::thrift::protocol::T_BOOL, 2); + xfer += oprot->writeBool(this->overwrite); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -15914,19 +15932,26 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op void swap(InsertEventRequestData &a, InsertEventRequestData &b) { using ::std::swap; swap(a.filesAdded, b.filesAdded); + swap(a.overwrite, b.overwrite); + swap(a.__isset, b.__isset); } InsertEventRequestData::InsertEventRequestData(const InsertEventRequestData& other655) { filesAdded = other655.filesAdded; + overwrite = other655.overwrite; + __isset = other655.__isset; } InsertEventRequestData& InsertEventRequestData::operator=(const InsertEventRequestData& other656) { filesAdded = other656.filesAdded; + overwrite = other656.overwrite; + __isset = other656.__isset; return *this; } void InsertEventRequestData::printTo(std::ostream& out) const { using ::apache::thrift::to_string; out << "InsertEventRequestData("; out << "filesAdded=" << to_string(filesAdded); + out << ", " << "overwrite="; (__isset.overwrite ? (out << to_string(overwrite)) : (out << "")); out << ")"; } diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h index ec81798..51cb11d 100644 --- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -6423,24 +6423,37 @@ inline std::ostream& operator<<(std::ostream& out, const CurrentNotificationEven return out; } +typedef struct _InsertEventRequestData__isset { + _InsertEventRequestData__isset() : overwrite(false) {} + bool overwrite :1; +} _InsertEventRequestData__isset; class InsertEventRequestData { public: InsertEventRequestData(const InsertEventRequestData&); InsertEventRequestData& operator=(const InsertEventRequestData&); - InsertEventRequestData() { + InsertEventRequestData() : overwrite(0) { } virtual ~InsertEventRequestData() throw(); std::vector filesAdded; + bool overwrite; + + _InsertEventRequestData__isset __isset; void __set_filesAdded(const std::vector & val); + void __set_overwrite(const bool val); + bool operator == (const InsertEventRequestData & rhs) const { if (!(filesAdded == rhs.filesAdded)) return false; + if (__isset.overwrite != rhs.__isset.overwrite) + return false; + else if (__isset.overwrite && !(overwrite == rhs.overwrite)) + return false; return true; } bool operator != (const InsertEventRequestData &rhs) const { diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java index a8df524..8f27d61 100644 --- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java +++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java @@ -39,6 +39,7 @@ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InsertEventRequestData"); private static final org.apache.thrift.protocol.TField FILES_ADDED_FIELD_DESC = new org.apache.thrift.protocol.TField("filesAdded", org.apache.thrift.protocol.TType.LIST, (short)1); + private static final org.apache.thrift.protocol.TField OVERWRITE_FIELD_DESC = new org.apache.thrift.protocol.TField("overwrite", org.apache.thrift.protocol.TType.BOOL, (short)2); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -47,10 +48,12 @@ } private List filesAdded; // required + private boolean overwrite; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { - FILES_ADDED((short)1, "filesAdded"); + FILES_ADDED((short)1, "filesAdded"), + OVERWRITE((short)2, "overwrite"); private static final Map byName = new HashMap(); @@ -67,6 +70,8 @@ public static _Fields findByThriftId(int fieldId) { switch(fieldId) { case 1: // FILES_ADDED return FILES_ADDED; + case 2: // OVERWRITE + return OVERWRITE; default: return null; } @@ -107,12 +112,17 @@ public String getFieldName() { } // isset id assignments + private static final int __OVERWRITE_ISSET_ID = 0; + private byte __isset_bitfield = 0; + private static final _Fields optionals[] = {_Fields.OVERWRITE}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); tmpMap.put(_Fields.FILES_ADDED, new org.apache.thrift.meta_data.FieldMetaData("filesAdded", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); + tmpMap.put(_Fields.OVERWRITE, new org.apache.thrift.meta_data.FieldMetaData("overwrite", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(InsertEventRequestData.class, metaDataMap); } @@ -131,10 +141,12 @@ public InsertEventRequestData( * Performs a deep copy on other. */ public InsertEventRequestData(InsertEventRequestData other) { + __isset_bitfield = other.__isset_bitfield; if (other.isSetFilesAdded()) { List __this__filesAdded = new ArrayList(other.filesAdded); this.filesAdded = __this__filesAdded; } + this.overwrite = other.overwrite; } public InsertEventRequestData deepCopy() { @@ -144,6 +156,8 @@ public InsertEventRequestData deepCopy() { @Override public void clear() { this.filesAdded = null; + setOverwriteIsSet(false); + this.overwrite = false; } public int getFilesAddedSize() { @@ -184,6 +198,28 @@ public void setFilesAddedIsSet(boolean value) { } } + public boolean isOverwrite() { + return this.overwrite; + } + + public void setOverwrite(boolean overwrite) { + this.overwrite = overwrite; + setOverwriteIsSet(true); + } + + public void unsetOverwrite() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __OVERWRITE_ISSET_ID); + } + + /** Returns true if field overwrite is set (has been assigned a value) and false otherwise */ + public boolean isSetOverwrite() { + return EncodingUtils.testBit(__isset_bitfield, __OVERWRITE_ISSET_ID); + } + + public void setOverwriteIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __OVERWRITE_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case FILES_ADDED: @@ -194,6 +230,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case OVERWRITE: + if (value == null) { + unsetOverwrite(); + } else { + setOverwrite((Boolean)value); + } + break; + } } @@ -202,6 +246,9 @@ public Object getFieldValue(_Fields field) { case FILES_ADDED: return getFilesAdded(); + case OVERWRITE: + return isOverwrite(); + } throw new IllegalStateException(); } @@ -215,6 +262,8 @@ public boolean isSet(_Fields field) { switch (field) { case FILES_ADDED: return isSetFilesAdded(); + case OVERWRITE: + return isSetOverwrite(); } throw new IllegalStateException(); } @@ -241,6 +290,15 @@ public boolean equals(InsertEventRequestData that) { return false; } + boolean this_present_overwrite = true && this.isSetOverwrite(); + boolean that_present_overwrite = true && that.isSetOverwrite(); + if (this_present_overwrite || that_present_overwrite) { + if (!(this_present_overwrite && that_present_overwrite)) + return false; + if (this.overwrite != that.overwrite) + return false; + } + return true; } @@ -253,6 +311,11 @@ public int hashCode() { if (present_filesAdded) list.add(filesAdded); + boolean present_overwrite = true && (isSetOverwrite()); + list.add(present_overwrite); + if (present_overwrite) + list.add(overwrite); + return list.hashCode(); } @@ -274,6 +337,16 @@ public int compareTo(InsertEventRequestData other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetOverwrite()).compareTo(other.isSetOverwrite()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetOverwrite()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.overwrite, other.overwrite); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -301,6 +374,12 @@ public String toString() { sb.append(this.filesAdded); } first = false; + if (isSetOverwrite()) { + if (!first) sb.append(", "); + sb.append("overwrite:"); + sb.append(this.overwrite); + first = false; + } sb.append(")"); return sb.toString(); } @@ -324,6 +403,8 @@ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOExcept private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -366,6 +447,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, InsertEventRequestD org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 2: // OVERWRITE + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.overwrite = iprot.readBool(); + struct.setOverwriteIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -391,6 +480,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, InsertEventRequest } oprot.writeFieldEnd(); } + if (struct.isSetOverwrite()) { + oprot.writeFieldBegin(OVERWRITE_FIELD_DESC); + oprot.writeBool(struct.overwrite); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -415,6 +509,14 @@ public void write(org.apache.thrift.protocol.TProtocol prot, InsertEventRequestD oprot.writeString(_iter562); } } + BitSet optionals = new BitSet(); + if (struct.isSetOverwrite()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetOverwrite()) { + oprot.writeBool(struct.overwrite); + } } @Override @@ -431,6 +533,11 @@ public void read(org.apache.thrift.protocol.TProtocol prot, InsertEventRequestDa } } struct.setFilesAddedIsSet(true); + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.overwrite = iprot.readBool(); + struct.setOverwriteIsSet(true); + } } } diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php index f505208..c887309 100644 --- a/metastore/src/gen/thrift/gen-php/metastore/Types.php +++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -15721,6 +15721,10 @@ class InsertEventRequestData { * @var string[] */ public $filesAdded = null; + /** + * @var bool + */ + public $overwrite = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -15733,12 +15737,19 @@ class InsertEventRequestData { 'type' => TType::STRING, ), ), + 2 => array( + 'var' => 'overwrite', + 'type' => TType::BOOL, + ), ); } if (is_array($vals)) { if (isset($vals['filesAdded'])) { $this->filesAdded = $vals['filesAdded']; } + if (isset($vals['overwrite'])) { + $this->overwrite = $vals['overwrite']; + } } } @@ -15778,6 +15789,13 @@ class InsertEventRequestData { $xfer += $input->skip($ftype); } break; + case 2: + if ($ftype == TType::BOOL) { + $xfer += $input->readBool($this->overwrite); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -15808,6 +15826,11 @@ class InsertEventRequestData { } $xfer += $output->writeFieldEnd(); } + if ($this->overwrite !== null) { + $xfer += $output->writeFieldBegin('overwrite', TType::BOOL, 2); + $xfer += $output->writeBool($this->overwrite); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 8d88cd7..0ab0451 100644 --- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -10958,15 +10958,18 @@ class InsertEventRequestData: """ Attributes: - filesAdded + - overwrite """ thrift_spec = ( None, # 0 (1, TType.LIST, 'filesAdded', (TType.STRING,None), None, ), # 1 + (2, TType.BOOL, 'overwrite', None, None, ), # 2 ) - def __init__(self, filesAdded=None,): + def __init__(self, filesAdded=None, overwrite=None,): self.filesAdded = filesAdded + self.overwrite = overwrite def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -10987,6 +10990,11 @@ def read(self, iprot): iprot.readListEnd() else: iprot.skip(ftype) + elif fid == 2: + if ftype == TType.BOOL: + self.overwrite = iprot.readBool() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -11004,6 +11012,10 @@ def write(self, oprot): oprot.writeString(iter498) oprot.writeListEnd() oprot.writeFieldEnd() + if self.overwrite is not None: + oprot.writeFieldBegin('overwrite', TType.BOOL, 2) + oprot.writeBool(self.overwrite) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -11016,6 +11028,7 @@ def validate(self): def __hash__(self): value = 17 value = (value * 31) ^ hash(self.filesAdded) + value = (value * 31) ^ hash(self.overwrite) return value def __repr__(self): diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index 0964cd8..dcfbdbd 100644 --- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2461,9 +2461,11 @@ end class InsertEventRequestData include ::Thrift::Struct, ::Thrift::Struct_Union FILESADDED = 1 + OVERWRITE = 2 FIELDS = { - FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', :element => {:type => ::Thrift::Types::STRING}} + FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', :element => {:type => ::Thrift::Types::STRING}}, + OVERWRITE => {:type => ::Thrift::Types::BOOL, :name => 'overwrite', :optional => true} } def struct_fields; FIELDS; end diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index ef0bb3d..8eae0c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1554,8 +1554,10 @@ public Partition loadPartition(Path loadPath, Table tbl, Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); validatePartition(newTPart); - if (null != newFiles) { - fireInsertEvent(tbl, partSpec, newFiles); + if ((null != newFiles) || replace) { + fireInsertEvent(tbl, partSpec, newFiles, replace); + } else { + LOG.debug("No new files were created, and is not a replace. Skipping generating INSERT event."); } //column stats will be inaccurate @@ -1856,7 +1858,7 @@ public void loadTable(Path loadPath, String tableName, boolean replace, boolean throw new HiveException(e); } - fireInsertEvent(tbl, null, newFiles); + fireInsertEvent(tbl, null, newFiles, replace); } /** @@ -2059,7 +2061,7 @@ public Partition getPartition(Table tbl, Map partSpec, } else { alterPartitionSpec(tbl, partSpec, tpart, inheritTableSpecs, partPath); - fireInsertEvent(tbl, partSpec, newFiles); + fireInsertEvent(tbl, partSpec, newFiles,false); } } if (tpart == null) { @@ -2109,7 +2111,7 @@ private void alterPartitionSpecInMemory(Table tbl, tpart.getSd().setLocation(partPath); } - private void fireInsertEvent(Table tbl, Map partitionSpec, List newFiles) + private void fireInsertEvent(Table tbl, Map partitionSpec, List newFiles, boolean overwrite) throws HiveException { if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) { LOG.debug("Firing dml insert event"); @@ -2127,6 +2129,7 @@ private void fireInsertEvent(Table tbl, Map partitionSpec, List< } else { insertData.setFilesAdded(new ArrayList()); } + insertData.setOverwrite(overwrite); FireEventRequest rqst = new FireEventRequest(true, data); rqst.setDbName(tbl.getDbName()); rqst.setTableName(tbl.getTableName());