Uploaded image for project: 'Tajo'
  1. Tajo
  2. TAJO-908

Fetcher does not retry, when pull server connection was closed

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0, 0.9.0
    • Fix Version/s: 0.9.0
    • Component/s: Data Shuffle
    • Labels:
      None

      Description

      Exception message as follows:

      PullServer
      2014-07-04 09:45:21,150 ERROR pullserver.TajoPullServerService (TajoPullServerService.java:exceptionCaught(533)) - PullServer error:
      java.io.IOException: Connection timed out
              at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
              at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
              at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
              at sun.nio.ch.IOUtil.read(IOUtil.java:192)
              at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
              at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64)
              at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
              at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
              at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
              at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              at java.lang.Thread.run(Thread.java:745)
      
      Fetcher
       org.jboss.netty.channel.SimpleChannelUpstreamHandler
      WARN: EXCEPTION, please implement org.apache.tajo.worker.Fetcher$HttpClientHandler.exceptionCaught() for proper handling.
      java.io.IOException: Connection reset by peer
              at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
              at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
              at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
              at sun.nio.ch.IOUtil.read(IOUtil.java:192)
              at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
              at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64)
              at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
              at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
              at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
              at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              at java.lang.Thread.run(Thread.java:745)
      
      2014-07-04 10:56:44,561 INFO  worker.Task (Task.java:waitForFetch(375)) - ta_1404438787481_0001_000004_000029_00 All fetches are done!
      

        Activity

        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user jinossy opened a pull request:

        https://github.com/apache/tajo/pull/58

        TAJO-908: Fetcher does not retry, when pull server connection was closed

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/jinossy/tajo TAJO-908

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/tajo/pull/58.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #58


        commit aa17a6f77a03a85b6bd25066a516d00c40791582
        Author: jinossy <jinossy@gmail.com>
        Date: 2014-07-04T16:13:37Z

        TAJO-908: Fetcher does not retry, when pull server connection was closed

        commit ad13df133cbf847c1ace4f9afc29805b4fd72c25
        Author: jinossy <jinossy@gmail.com>
        Date: 2014-07-04T16:17:52Z

        added file resource cleanup


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user jinossy opened a pull request: https://github.com/apache/tajo/pull/58 TAJO-908 : Fetcher does not retry, when pull server connection was closed You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinossy/tajo TAJO-908 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/tajo/pull/58.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #58 commit aa17a6f77a03a85b6bd25066a516d00c40791582 Author: jinossy <jinossy@gmail.com> Date: 2014-07-04T16:13:37Z TAJO-908 : Fetcher does not retry, when pull server connection was closed commit ad13df133cbf847c1ace4f9afc29805b4fd72c25 Author: jinossy <jinossy@gmail.com> Date: 2014-07-04T16:17:52Z added file resource cleanup
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/58#discussion_r14805007

        — Diff: tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java —
        @@ -96,29 +113,83 @@ public void testAdjustFetchProcess() {
        @Test
        public void testStatus() throws Exception {
        Random rnd = new Random();

        • FileWriter writer = new FileWriter(INPUT_DIR + "data");
        • String data;
          + QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
          + String sid = "1";
          + String ta = "1_0";
          + String partId = "1";
          +
          + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
          + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
          +
          + FSDataOutputStream stream = LocalFileSystem.get(conf).create(new Path(dataPath), true);
          for (int i = 0; i < 100; i++) { - data = ""+rnd.nextInt(); - writer.write(data); + String data = ""+rnd.nextInt(); + stream.write(data.getBytes()); }
        • writer.flush();
        • writer.close();
          + stream.flush();
          + stream.close();
          +
          + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
          + final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
          + assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
          +
          + fetcher.get();
          + assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
          + }
        • DataRetriever ret = new DirectoryRetriever(INPUT_DIR);
        • final HttpDataServer server = new HttpDataServer(
        • NetUtils.createSocketAddr("127.0.0.1:0"), ret);
        • server.start();
        • InetSocketAddress addr = server.getBindAddress();
          + @Test
          + public void testEmptyFileTask() throws Exception {
            • End diff –

        It would be better if its name is testNoContentFetch. This is because this test is for NO_CONTENT response.

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/58#discussion_r14805007 — Diff: tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java — @@ -96,29 +113,83 @@ public void testAdjustFetchProcess() { @Test public void testStatus() throws Exception { Random rnd = new Random(); FileWriter writer = new FileWriter(INPUT_DIR + "data"); String data; + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; + + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta); + + FSDataOutputStream stream = LocalFileSystem.get(conf).create(new Path(dataPath), true); for (int i = 0; i < 100; i++) { - data = ""+rnd.nextInt(); - writer.write(data); + String data = ""+rnd.nextInt(); + stream.write(data.getBytes()); } writer.flush(); writer.close(); + stream.flush(); + stream.close(); + + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory); + assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); + + fetcher.get(); + assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState()); + } DataRetriever ret = new DirectoryRetriever(INPUT_DIR); final HttpDataServer server = new HttpDataServer( NetUtils.createSocketAddr("127.0.0.1:0"), ret); server.start(); InetSocketAddress addr = server.getBindAddress(); + @Test + public void testEmptyFileTask() throws Exception { End diff – It would be better if its name is testNoContentFetch. This is because this test is for NO_CONTENT response.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on a diff in the pull request:

        https://github.com/apache/tajo/pull/58#discussion_r14805622

        — Diff: tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java —
        @@ -96,29 +113,83 @@ public void testAdjustFetchProcess() {
        @Test
        public void testStatus() throws Exception {
        Random rnd = new Random();

        • FileWriter writer = new FileWriter(INPUT_DIR + "data");
        • String data;
          + QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
          + String sid = "1";
          + String ta = "1_0";
          + String partId = "1";
          +
          + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
          + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
          +
          + FSDataOutputStream stream = LocalFileSystem.get(conf).create(new Path(dataPath), true);
          for (int i = 0; i < 100; i++) { - data = ""+rnd.nextInt(); - writer.write(data); + String data = ""+rnd.nextInt(); + stream.write(data.getBytes()); }
        • writer.flush();
        • writer.close();
          + stream.flush();
          + stream.close();
          +
          + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
          + final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
          + assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
          +
          + fetcher.get();
          + assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
          + }
        • DataRetriever ret = new DirectoryRetriever(INPUT_DIR);
        • final HttpDataServer server = new HttpDataServer(
        • NetUtils.createSocketAddr("127.0.0.1:0"), ret);
        • server.start();
        • InetSocketAddress addr = server.getBindAddress();
          + @Test
          + public void testEmptyFileTask() throws Exception {
          +
          + QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
          + String sid = "1";
          + String ta = "1_0";
          + String partId = "1";
          +
          + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
          + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
          +
          + Path inputPath = new Path(dataPath);
          + if(LocalFileSystem.get(conf).exists(inputPath)) { + LocalFileSystem.get(conf).delete(new Path(dataPath), true); + }
        • URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data");
        • ClientSocketChannelFactory channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
          + FSDataOutputStream stream = LocalFileSystem.get(conf).create(new Path(dataPath).getParent(), true);
          + stream.close();
        • final Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), channelFactory);
          + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
          + final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
          assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());

        fetcher.get();
        assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());

        • server.stop();
          + }
          +
          + @Test
          + public void testFailureStatus() throws Exception {
          + Random rnd = new Random();
          +
          + QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
          + String sid = "1";
          + String ta = "1_0";
          + String partId = "1";
          +
          + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
          + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "x", ta);
            • End diff –

        The fetch failure will be caused by shuffle type character 'x' because 'h','r', and 's' are only allowed. But, it is hard for usual contributors to know this fact. Could you add a brief comment about the fact?

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/58#discussion_r14805622 — Diff: tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java — @@ -96,29 +113,83 @@ public void testAdjustFetchProcess() { @Test public void testStatus() throws Exception { Random rnd = new Random(); FileWriter writer = new FileWriter(INPUT_DIR + "data"); String data; + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; + + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta); + + FSDataOutputStream stream = LocalFileSystem.get(conf).create(new Path(dataPath), true); for (int i = 0; i < 100; i++) { - data = ""+rnd.nextInt(); - writer.write(data); + String data = ""+rnd.nextInt(); + stream.write(data.getBytes()); } writer.flush(); writer.close(); + stream.flush(); + stream.close(); + + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory); + assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); + + fetcher.get(); + assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState()); + } DataRetriever ret = new DirectoryRetriever(INPUT_DIR); final HttpDataServer server = new HttpDataServer( NetUtils.createSocketAddr("127.0.0.1:0"), ret); server.start(); InetSocketAddress addr = server.getBindAddress(); + @Test + public void testEmptyFileTask() throws Exception { + + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; + + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta); + + Path inputPath = new Path(dataPath); + if(LocalFileSystem.get(conf).exists(inputPath)) { + LocalFileSystem.get(conf).delete(new Path(dataPath), true); + } URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data"); ClientSocketChannelFactory channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1); + FSDataOutputStream stream = LocalFileSystem.get(conf).create(new Path(dataPath).getParent(), true); + stream.close(); final Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), channelFactory); + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory); assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); fetcher.get(); assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState()); server.stop(); + } + + @Test + public void testFailureStatus() throws Exception { + Random rnd = new Random(); + + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; + + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "x", ta); End diff – The fetch failure will be caused by shuffle type character 'x' because 'h','r', and 's' are only allowed. But, it is hard for usual contributors to know this fact. Could you add a brief comment about the fact?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user jinossy commented on the pull request:

        https://github.com/apache/tajo/pull/58#issuecomment-48706034

        Thank you for the review
        I reflected your comment.

        Show
        githubbot ASF GitHub Bot added a comment - Github user jinossy commented on the pull request: https://github.com/apache/tajo/pull/58#issuecomment-48706034 Thank you for the review I reflected your comment.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user hyunsik commented on the pull request:

        https://github.com/apache/tajo/pull/58#issuecomment-48720796

        +1
        Ship it!

        Show
        githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/58#issuecomment-48720796 +1 Ship it!
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/tajo/pull/58

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/tajo/pull/58
        Hide
        jhkim Jinho Kim added a comment -

        I've just committed it

        Show
        jhkim Jinho Kim added a comment - I've just committed it
        Hide
        hudson Hudson added a comment -

        SUCCESS: Integrated in Tajo-master-build #292 (See https://builds.apache.org/job/Tajo-master-build/292/)
        TAJO-908: Fetcher does not retry, when pull server connection was closed. (jinho) (jinossy: rev 01857dadd42bd55894ce6edb55159e80c30f6c30)

        • tajo-common/src/main/proto/tajo_protos.proto
        • CHANGES
        • tajo-core/src/main/java/org/apache/tajo/worker/Task.java
        • tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
        • tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
        • tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
        • tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
        Show
        hudson Hudson added a comment - SUCCESS: Integrated in Tajo-master-build #292 (See https://builds.apache.org/job/Tajo-master-build/292/ ) TAJO-908 : Fetcher does not retry, when pull server connection was closed. (jinho) (jinossy: rev 01857dadd42bd55894ce6edb55159e80c30f6c30) tajo-common/src/main/proto/tajo_protos.proto CHANGES tajo-core/src/main/java/org/apache/tajo/worker/Task.java tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java

          People

          • Assignee:
            jhkim Jinho Kim
            Reporter:
            jhkim Jinho Kim
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development