Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
Description
CompositeOmegaCallback类的public void compensate(TxEvent event)方法可能会有并发异常
public void compensate(TxEvent event) { Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap()); if (serviceCallbacks.isEmpty()) { throw new AlphaException("No such omega callback found for service " + event.serviceName()); } OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId()); if (omegaCallback == null) { LOG.info("Cannot find the service with the instanceId {}, call the other instance.", event.instanceId()); omegaCallback = serviceCallbacks.values().iterator().next(); } try { omegaCallback.compensate(event); } catch (Exception e) { serviceCallbacks.values().remove(omegaCallback); throw e; } }
线程可能通过了 if (omegaCallback == null) 的判断条件但是在omegaCallback = serviceCallbacks.values().iterator().next()之前失去了cpu执行权,由于其他线程对serviceCallbacks这个map的操作,目前看alpha端有两种情况:一种是接收到omega端的onDisconnected请求将对应omega端实例从map中移除;一种是执行pendingTask的线程重新进行补偿时失败执行下面这部分代码catch (Exception e)
{ serviceCallbacks.values().remove(omegaCallback); throw e; }时也会移除map中对应的omega端实例。
这部分代码由于是并发异常,发生的可能性本来就非常小,所以比较难以发现和复现。我是对源码在特定位置做了些修改然后复现出来的
if (omegaCallback == null) { LOG.info("Cannot find the service with the instanceId {}, call the other instance.", event.instanceId()); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } omegaCallback = serviceCallbacks.values().iterator().next(); }
try { throw new RuntimeException(); // omegaCallback.compensate(event); } catch (Exception e) {
下面是测试代码
@Test public void compensateWithConcurrency() throws InterruptedException { ConcurrentHashMap<String, OmegaCallback> serviceCallbacks = new ConcurrentHashMap(); serviceCallbacks.put(instanceId1One,callback1One); callbacks.put(serviceName1,serviceCallbacks); new Thread(new Runnable() { @Override public void run() { compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1Two,TxStartedEvent)); } }).start(); TimeUnit.SECONDS.sleep(1); new Thread(new Runnable() { @Override public void run() { compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1One,TxStartedEvent)); } }).start(); TimeUnit.SECONDS.sleep(3); }
Attachments
Issue Links
- links to