Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-13652

TransportClient.sendRpcSync returns wrong results

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.6.0
    • Fix Version/s: 1.6.2, 2.0.0
    • Component/s: None
    • Labels:
      None

      Description

      TransportClient is not thread safe and if it is called from multiple threads, the messages can't be encoded and decoded correctly. Below is my code,and it will print wrong message.

      public static void main(String[] args) throws IOException, InterruptedException {
      
              TransportServer server = new TransportContext(new TransportConf("test",
                      new MapConfigProvider(new HashMap<String, String>())), new RankHandler()).
                      createServer(8081, new LinkedList<TransportServerBootstrap>());
      
              TransportContext context = new TransportContext(new TransportConf("test",
                      new MapConfigProvider(new HashMap<String, String>())), new NoOpRpcHandler(), true);
              final TransportClientFactory clientFactory = context.createClientFactory();
              List<Thread> ts = new ArrayList<>();
              for (int i = 0; i < 10; i++) {
                  ts.add(new Thread(new Runnable() {
                      @Override
                      public void run() {
                          for (int j = 0; j < 1000; j++) {
                              try {
                                  ByteBuf buf = Unpooled.buffer(8);
                                  buf.writeLong((long) j);
                                  ByteBuffer byteBuffer = clientFactory.createClient("localhost", 8081).
                                          sendRpcSync(buf.nioBuffer(), Long.MAX_VALUE);
      
                                  long response = byteBuffer.getLong();
                                  if (response != j) {
                                      System.err.println("send:" + j + ",response:" + response);
                                  }
                              } catch (IOException e) {
                                  e.printStackTrace();
                              }
                          }
                      }
                  }));
                  ts.get(i).start();
              }
              for (Thread t : ts) {
                  t.join();
              }
              server.close();
      
          }
      
      
      public class RankHandler extends RpcHandler {
      
          private final Logger logger = LoggerFactory.getLogger(RankHandler.class);
          private final StreamManager streamManager;
      
          public RankHandler() {
              this.streamManager = new OneForOneStreamManager();
          }
      
          @Override
          public void receive(TransportClient client, ByteBuffer msg, RpcResponseCallback callback) {
              callback.onSuccess(msg);
          }
      
          @Override
          public StreamManager getStreamManager() {
              return streamManager;
          }
      }
      

      it will print as below
      send:221,response:222
      send:233,response:234
      send:312,response:313
      send:358,response:359
      ...

        Attachments

        1. Test.java
          2 kB
          huangyu
        2. RankHandler.java
          0.9 kB
          huangyu

          Activity

            People

            • Assignee:
              zsxwing Shixiong Zhu
              Reporter:
              huang_yuu huangyu
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: