commit c9410f2c22341765cd3d4994c4889a0f63d5b638 Author: Vihang Karajgaonkar Date: Fri Jun 7 18:54:12 2019 -0700 HIVE-21851 : FireEventResponse should include event id when available diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index edf861f5b368ad19ea389d5e7d08839305ee01e0..c8290c07fc6e13e9a5ab576f4be39e9de2ea34d5 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.listener; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FireEventRequest; import org.apache.hadoop.hive.metastore.api.FireEventRequestData; +import org.apache.hadoop.hive.metastore.api.FireEventResponse; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.FunctionType; import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; @@ -1088,7 +1090,10 @@ public void insertTable() throws Exception { rqst.setDbName(defaultDbName); rqst.setTableName(tblName); // Event 2 - msClient.fireListenerEvent(rqst); + FireEventResponse response = msClient.fireListenerEvent(rqst); + assertTrue("Event id must be set in the fireEvent response", response.isSetEventId()); + assertNotNull(response.getEventId()); + assertTrue(response.getEventId() != -1); // Get notifications from metastore NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); @@ -1158,7 +1163,10 @@ public void insertPartition() throws Exception { rqst.setTableName(tblName); rqst.setPartitionVals(partCol1Vals); // Event 3 - msClient.fireListenerEvent(rqst); + FireEventResponse response = msClient.fireListenerEvent(rqst); + assertTrue("Event id must be set in the fireEvent response", response.isSetEventId()); + assertNotNull(response.getEventId()); + assertTrue(response.getEventId() != -1); // Get notifications from metastore NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FireEventResponse.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FireEventResponse.java index 9125d865435fedae81aac19c1e947952582b19cc..4889b9612b251de9de9ed46a568dc3cd1647551a 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FireEventResponse.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FireEventResponse.java @@ -38,6 +38,7 @@ @org.apache.hadoop.classification.InterfaceAudience.Public @org.apache.hadoop.classification.InterfaceStability.Stable public class FireEventResponse implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("FireEventResponse"); + private static final org.apache.thrift.protocol.TField EVENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("eventId", org.apache.thrift.protocol.TType.I64, (short)1); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -45,10 +46,11 @@ schemes.put(TupleScheme.class, new FireEventResponseTupleSchemeFactory()); } + private long eventId; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { -; + EVENT_ID((short)1, "eventId"); private static final Map byName = new HashMap(); @@ -63,6 +65,8 @@ */ public static _Fields findByThriftId(int fieldId) { switch(fieldId) { + case 1: // EVENT_ID + return EVENT_ID; default: return null; } @@ -101,9 +105,16 @@ public String getFieldName() { return _fieldName; } } + + // isset id assignments + private static final int __EVENTID_ISSET_ID = 0; + private byte __isset_bitfield = 0; + private static final _Fields optionals[] = {_Fields.EVENT_ID}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.EVENT_ID, new org.apache.thrift.meta_data.FieldMetaData("eventId", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(FireEventResponse.class, metaDataMap); } @@ -115,6 +126,8 @@ public FireEventResponse() { * Performs a deep copy on other. */ public FireEventResponse(FireEventResponse other) { + __isset_bitfield = other.__isset_bitfield; + this.eventId = other.eventId; } public FireEventResponse deepCopy() { @@ -123,15 +136,50 @@ public FireEventResponse deepCopy() { @Override public void clear() { + setEventIdIsSet(false); + this.eventId = 0; + } + + public long getEventId() { + return this.eventId; + } + + public void setEventId(long eventId) { + this.eventId = eventId; + setEventIdIsSet(true); + } + + public void unsetEventId() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __EVENTID_ISSET_ID); + } + + /** Returns true if field eventId is set (has been assigned a value) and false otherwise */ + public boolean isSetEventId() { + return EncodingUtils.testBit(__isset_bitfield, __EVENTID_ISSET_ID); + } + + public void setEventIdIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __EVENTID_ISSET_ID, value); } public void setFieldValue(_Fields field, Object value) { switch (field) { + case EVENT_ID: + if (value == null) { + unsetEventId(); + } else { + setEventId((Long)value); + } + break; + } } public Object getFieldValue(_Fields field) { switch (field) { + case EVENT_ID: + return getEventId(); + } throw new IllegalStateException(); } @@ -143,6 +191,8 @@ public boolean isSet(_Fields field) { } switch (field) { + case EVENT_ID: + return isSetEventId(); } throw new IllegalStateException(); } @@ -160,6 +210,15 @@ public boolean equals(FireEventResponse that) { if (that == null) return false; + boolean this_present_eventId = true && this.isSetEventId(); + boolean that_present_eventId = true && that.isSetEventId(); + if (this_present_eventId || that_present_eventId) { + if (!(this_present_eventId && that_present_eventId)) + return false; + if (this.eventId != that.eventId) + return false; + } + return true; } @@ -167,6 +226,11 @@ public boolean equals(FireEventResponse that) { public int hashCode() { List list = new ArrayList(); + boolean present_eventId = true && (isSetEventId()); + list.add(present_eventId); + if (present_eventId) + list.add(eventId); + return list.hashCode(); } @@ -178,6 +242,16 @@ public int compareTo(FireEventResponse other) { int lastComparison = 0; + lastComparison = Boolean.valueOf(isSetEventId()).compareTo(other.isSetEventId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetEventId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.eventId, other.eventId); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -198,6 +272,11 @@ public String toString() { StringBuilder sb = new StringBuilder("FireEventResponse("); boolean first = true; + if (isSetEventId()) { + sb.append("eventId:"); + sb.append(this.eventId); + first = false; + } sb.append(")"); return sb.toString(); } @@ -217,6 +296,8 @@ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOExcept private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -241,6 +322,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, FireEventResponse s break; } switch (schemeField.id) { + case 1: // EVENT_ID + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.eventId = iprot.readI64(); + struct.setEventIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -254,6 +343,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, FireEventResponse struct.validate(); oprot.writeStructBegin(STRUCT_DESC); + if (struct.isSetEventId()) { + oprot.writeFieldBegin(EVENT_ID_FIELD_DESC); + oprot.writeI64(struct.eventId); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -271,11 +365,24 @@ public FireEventResponseTupleScheme getScheme() { @Override public void write(org.apache.thrift.protocol.TProtocol prot, FireEventResponse struct) throws org.apache.thrift.TException { TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetEventId()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetEventId()) { + oprot.writeI64(struct.eventId); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, FireEventResponse struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.eventId = iprot.readI64(); + struct.setEventIdIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php index fa5c20b7c38638cad96117dd71887a09a99d264c..ac1f9ed7e47987ab9a97f83bc7a68fbd17928476 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php @@ -25086,12 +25086,25 @@ class FireEventRequest { class FireEventResponse { static $_TSPEC; + /** + * @var int + */ + public $eventId = null; - public function __construct() { + public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { self::$_TSPEC = array( + 1 => array( + 'var' => 'eventId', + 'type' => TType::I64, + ), ); } + if (is_array($vals)) { + if (isset($vals['eventId'])) { + $this->eventId = $vals['eventId']; + } + } } public function getName() { @@ -25113,6 +25126,13 @@ class FireEventResponse { } switch ($fid) { + case 1: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->eventId); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -25126,6 +25146,11 @@ class FireEventResponse { public function write($output) { $xfer = 0; $xfer += $output->writeStructBegin('FireEventResponse'); + if ($this->eventId !== null) { + $xfer += $output->writeFieldBegin('eventId', TType::I64, 1); + $xfer += $output->writeI64($this->eventId); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 9ab129cf909e97df8e5b5be05686fbb52cc826c3..f9c622f7ff2d65ae2191a8141eb93527bc820da4 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -17482,10 +17482,19 @@ def __ne__(self, other): return not (self == other) class FireEventResponse: + """ + Attributes: + - eventId + """ thrift_spec = ( + None, # 0 + (1, TType.I64, 'eventId', None, None, ), # 1 ) + def __init__(self, eventId=None,): + self.eventId = eventId + def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) @@ -17495,6 +17504,11 @@ def read(self, iprot): (fname, ftype, fid) = iprot.readFieldBegin() if ftype == TType.STOP: break + if fid == 1: + if ftype == TType.I64: + self.eventId = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -17505,6 +17519,10 @@ def write(self, oprot): oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) return oprot.writeStructBegin('FireEventResponse') + if self.eventId is not None: + oprot.writeFieldBegin('eventId', TType.I64, 1) + oprot.writeI64(self.eventId) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -17514,6 +17532,7 @@ def validate(self): def __hash__(self): value = 17 + value = (value * 31) ^ hash(self.eventId) return value def __repr__(self): diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index 9cb5679352117777a37f92932e6ab1bb97e29987..f493f1fa5ed038a0f06fb354acd2dc65ec9b6a08 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -3884,9 +3884,10 @@ end class FireEventResponse include ::Thrift::Struct, ::Thrift::Struct_Union + EVENTID = 1 FIELDS = { - + EVENTID => {:type => ::Thrift::Types::I64, :name => 'eventId', :optional => true} } def struct_fields; FIELDS; end diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index 3efd56a998b7017f8fa74dc005dab26512547632..6f321d10804b4a2c81363321208fde251b4eb93d 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1271,7 +1271,7 @@ struct FireEventRequest { } struct FireEventResponse { - // NOP for now, this is just a place holder for future responses + 1: optional i64 eventId } struct WriteNotificationLogRequest { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 94698e6771890a050b00b374ca0ee926f768aa0e..9201ae517094564d79536f0679e5c8720b8237c0 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -8538,8 +8538,13 @@ public FireEventResponse fire_listener_event(FireEventRequest rqst) throws TExce */ MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.INSERT, event); MetaStoreListenerNotifier.notifyEvent(listeners, EventType.INSERT, event); - - return new FireEventResponse(); + FireEventResponse response = new FireEventResponse(); + if (event.getParameters() != null && event.getParameters() + .containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) { + response.setEventId(Long.valueOf(event.getParameters() + .get(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME))); + } + return response; default: throw new TException("Event type " + rqst.getData().getSetField().toString()