Uploaded image for project: 'Apache ServiceComb'
  1. Apache ServiceComb
  2. SCB-1081

CompositeOmegaCallback's compensate(TxEvent event) method has concurrency issues

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • pack-0.3.0
    • Saga
    • None

    Description

      CompositeOmegaCallback类的public void compensate(TxEvent event)方法可能会有并发异常

       

      public void compensate(TxEvent event) {
       Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap());
      if (serviceCallbacks.isEmpty()) {
       throw new AlphaException("No such omega callback found for service " + event.serviceName());
       }
      OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
       if (omegaCallback == null) {
       LOG.info("Cannot find the service with the instanceId {}, call the other instance.", event.instanceId());
       omegaCallback = serviceCallbacks.values().iterator().next();
       }
      try {
       omegaCallback.compensate(event);
       } catch (Exception e) {
       serviceCallbacks.values().remove(omegaCallback);
       throw e;
       }
       }
      

      线程可能通过了 if (omegaCallback == null) 的判断条件但是在omegaCallback = serviceCallbacks.values().iterator().next()之前失去了cpu执行权,由于其他线程对serviceCallbacks这个map的操作,目前看alpha端有两种情况:一种是接收到omega端的onDisconnected请求将对应omega端实例从map中移除;一种是执行pendingTask的线程重新进行补偿时失败执行下面这部分代码catch (Exception e)

      { serviceCallbacks.values().remove(omegaCallback); throw e; }

      时也会移除map中对应的omega端实例。

       

      这部分代码由于是并发异常,发生的可能性本来就非常小,所以比较难以发现和复现。我是对源码在特定位置做了些修改然后复现出来的

      if (omegaCallback == null) {
       LOG.info("Cannot find the service with the instanceId {}, call the other instance.", event.instanceId());
       try {
       TimeUnit.SECONDS.sleep(2);
       } catch (InterruptedException e) {
       e.printStackTrace();
       }
      
       omegaCallback = serviceCallbacks.values().iterator().next();
      
      }

       

      try {
      
       throw new RuntimeException();
      // omegaCallback.compensate(event);
       } catch (Exception e) {

       

      下面是测试代码

      @Test
      public void compensateWithConcurrency() throws InterruptedException {
      
       ConcurrentHashMap<String, OmegaCallback> serviceCallbacks = new ConcurrentHashMap();
       serviceCallbacks.put(instanceId1One,callback1One);
       callbacks.put(serviceName1,serviceCallbacks);
       new Thread(new Runnable() {
       @Override
       public void run() {
       compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1Two,TxStartedEvent));
       }
       }).start();
      
      
       TimeUnit.SECONDS.sleep(1);
      
       new Thread(new Runnable() {
       @Override
       public void run() {
       compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1One,TxStartedEvent));
       }
       }).start();
      
      
       TimeUnit.SECONDS.sleep(3);
      
      }

      Attachments

        Issue Links

          Activity

            People

              njiang Willem Jiang
              takumiCX takumiCX
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m