Description
When peer accessors do puts to a replicated region with a serial gateway sender via multiple threads and on the same key, ConcurrentCacheModificationException in LocalRegion.virtualPut causes notifyGatewaySender to be called, which puts the event into the queue. Since the AbstractUpdateOperation.doPutOrCreate method can potentially call LocalRegion.virtualPut three times and encounter a ConcurrentCacheModificationException each time, this can lead to the event being put in the queue twice but only removed once and causing the unprocessedTokensMap to accumulate events.
Here are the two stacks:
[warn 2019/12/02 12:47:59.102 PST <P2P message reader for 10.255.202.119(gateway-ln-2:85182)<v97>:41004 unshared ordered uid=11 dom #2 port=59034> tid=0x61] XXX LocalRegion.virtualPut caught ConcurrentCacheModificationException about to notifyGatewaySender eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=5;sequenceID=273]; ifNew=false; ifOld=true; overwriteDestroyed=false; eventIdentity=329453507; eventValue=Trade[id=-1501795011; cusip=PVTL; shares=29; price=163; payloadLength=0 bytes] java.lang.Exception at org.apache.geode.internal.cache.LocalRegion.virtualPut(LocalRegion.java:5591) at org.apache.geode.internal.cache.DistributedRegion.virtualPut(DistributedRegion.java:385) at org.apache.geode.internal.cache.LocalRegionDataView.putEntry(LocalRegionDataView.java:162) at org.apache.geode.internal.cache.LocalRegion.basicUpdate(LocalRegion.java:5561) at org.apache.geode.internal.cache.AbstractUpdateOperation.doPutOrCreate(AbstractUpdateOperation.java:182) at org.apache.geode.internal.cache.AbstractUpdateOperation$AbstractUpdateMessage.basicOperateOnRegion(AbstractUpdateOperation.java:287) at org.apache.geode.internal.cache.AbstractUpdateOperation$AbstractUpdateMessage.operateOnRegion(AbstractUpdateOperation.java:258) at org.apache.geode.internal.cache.DistributedCacheOperation$CacheOperationMessage.basicProcess(DistributedCacheOperation.java:1206) at org.apache.geode.internal.cache.DistributedCacheOperation$CacheOperationMessage.process(DistributedCacheOperation.java:1108) at org.apache.geode.distributed.internal.DistributionMessage.scheduleAction(DistributionMessage.java:372) at org.apache.geode.distributed.internal.DistributionMessage.schedule(DistributionMessage.java:427)
[warn 2019/12/02 12:47:59.108 PST <P2P message reader for 10.255.202.119(gateway-ln-2:85182)<v97>:41004 unshared ordered uid=11 dom #2 port=59034> tid=0x61] XXX LocalRegion.virtualPut caught ConcurrentCacheModificationException about to notifyGatewaySender eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=5;sequenceID=273]; ifNew=false; ifOld=false; overwriteDestroyed=true; eventIdentity=329453507; eventValue=Trade[id=-1501795011; cusip=PVTL; shares=29; price=163; payloadLength=0 bytes] java.lang.Exception at org.apache.geode.internal.cache.LocalRegion.virtualPut(LocalRegion.java:5591) at org.apache.geode.internal.cache.DistributedRegion.virtualPut(DistributedRegion.java:385) at org.apache.geode.internal.cache.LocalRegionDataView.putEntry(LocalRegionDataView.java:162) at org.apache.geode.internal.cache.LocalRegion.basicUpdate(LocalRegion.java:5561) at org.apache.geode.internal.cache.AbstractUpdateOperation.doPutOrCreate(AbstractUpdateOperation.java:194) at org.apache.geode.internal.cache.AbstractUpdateOperation$AbstractUpdateMessage.basicOperateOnRegion(AbstractUpdateOperation.java:287) at org.apache.geode.internal.cache.AbstractUpdateOperation$AbstractUpdateMessage.operateOnRegion(AbstractUpdateOperation.java:258) at org.apache.geode.internal.cache.DistributedCacheOperation$CacheOperationMessage.basicProcess(DistributedCacheOperation.java:1206) at org.apache.geode.internal.cache.DistributedCacheOperation$CacheOperationMessage.process(DistributedCacheOperation.java:1108)
Here are the corresponding puts into the queue:
[warn 2019/12/02 12:47:59.104 PST <P2P message reader for 10.255.202.119(gateway-ln-2:85182)<v97>:41004 unshared ordered uid=11 dom #2 port=59034> tid=0x61] XXX SerialGatewaySenderQueue.putAndGetKey key=3625; eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273]; eventValue=Trade[id=-1501795011; cusip=PVTL; shares=29; price=163.08897399902344; payloadLength=0 bytes]
[warn 2019/12/02 12:47:59.110 PST <P2P message reader for 10.255.202.119(gateway-ln-2:85182)<v97>:41004 unshared ordered uid=11 dom #2 port=59034> tid=0x61] XXX SerialGatewaySenderQueue.putAndGetKey key=3635; eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273]; eventValue=Trade[id=-1501795011; cusip=PVTL; shares=29; price=163.08897399902344; payloadLength=0 bytes]
On the secondary, when the event is received via normal replication, its added to the unprocessedEvents map:
[warn 2019/12/02 12:47:59.100 PST <P2P message reader for 10.255.202.119(accessor-ln-1:85194)<v98>:41005 unshared ordered uid=13 dom #1 port=59022> tid=0x58] SerialGatewaySenderEventProcessor.basicHandleSecondaryEvent put unprocessedEvents eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273]
The first replication from the primary queue is received which removes the event from the unprocessedEvents map:
[warn 2019/12/02 12:47:59.104 PST <P2P message reader for 10.255.202.119(gateway-ln-1:85170)<v96>:41003 unshared ordered uid=18 dom #3 port=59052> tid=0x68] XXX SerialSecondaryGatewayListener.afterCreate senderEvent=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273] [warn 2019/12/02 12:47:59.104 PST <Queued Gateway Listener Thread1> tid=0x5e] SerialGatewaySenderEventProcessor.basicHandlePrimaryEvent removed unprocessedEvents eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273]; value=org.apache.geode.internal.cache.wan.AbstractGatewaySender$EventWrapper@3f6df03b
Then the second replication from the primary queue is received which incorrectly adds the event to the unprocessedTokens map where is stays forever:
[warn 2019/12/02 12:47:59.110 PST <P2P message reader for 10.255.202.119(gateway-ln-1:85170)<v96>:41003 unshared ordered uid=18 dom #3 port=59052> tid=0x68] XXX SerialSecondaryGatewayListener.afterCreate senderEvent=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273] [warn 2019/12/02 12:47:59.110 PST <Queued Gateway Listener Thread1> tid=0x5e] SerialGatewaySenderEventProcessor.basicHandlePrimaryEvent put unprocessedTokens eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273]; value=1575319799110; size=914
The proposed solution to this issue is to add two boolean arguments to the LocalRegion.virtualPut method, one to control if a ConcurrentCacheModificationException should result in notifying the bridge clients and gateway senders, and another to control if any ConcurrentCacheModificationException encountered should be thrown or suppressed. These arguments allow the AbstractUpdateOperation.doPutOrCreate method to 1. prevent subsequent calls to LocalRegion.virtualPut following a ConcurrentCacheModificationException from notifying the gateway sender, and 2. know whether or not the LocalRegion.virtualPut method failed specifically due to a ConcurrentCacheModificationException.
Attachments
Issue Links
- links to