Hama
  1. Hama
  2. HAMA-531

Data re-partitioning in BSPJobClient

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.6.1
    • Component/s: None
    • Labels:
      None

      Description

      The re-partitioning the data is a very expensive operation. By the way, currently, we processes read/write operations sequentially using HDFS api in BSPJobClient from client-side. This causes potential too many open files error, contains HDFS overheads, and shows slow performance.

      We have to find another way to re-partitioning data.

      1. HAMA-531_1.patch
        19 kB
        Thomas Jungblut
      2. HAMA-531_2.patch
        32 kB
        Thomas Jungblut
      3. HAMA-531_final.patch
        41 kB
        Thomas Jungblut
      4. patch.txt
        19 kB
        Edward J. Yoon
      5. patch_v02.txt
        30 kB
        Edward J. Yoon
      6. patch_v03.txt
        42 kB
        Edward J. Yoon
      7. patch_v04.txt
        43 kB
        Edward J. Yoon

        Issue Links

          Activity

          Hide
          Edward J. Yoon added a comment -

          My miss, even if blocks size is equal to desired size partitioning should be executed.

          Show
          Edward J. Yoon added a comment - My miss, even if blocks size is equal to desired size partitioning should be executed.
          Hide
          Hudson added a comment -

          Integrated in Hama-Nightly #769 (See https://builds.apache.org/job/Hama-Nightly/769/)
          HAMA-531: Fix build failure (Revision 1423297)
          HAMA-531: Minor changes. Make partition dir configurable (Revision 1423269)

          Result = FAILURE
          edwardyoon :
          Files :

          • /hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java

          edwardyoon :
          Files :

          • /hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
          • /hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
          • /hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
          Show
          Hudson added a comment - Integrated in Hama-Nightly #769 (See https://builds.apache.org/job/Hama-Nightly/769/ ) HAMA-531 : Fix build failure (Revision 1423297) HAMA-531 : Minor changes. Make partition dir configurable (Revision 1423269) Result = FAILURE edwardyoon : Files : /hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java edwardyoon : Files : /hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java /hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java /hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
          Hide
          Hudson added a comment -

          Integrated in Hama trunk #81 (See https://builds.apache.org/job/Hama%20trunk/81/)
          HAMA-531: Fix build failure (Revision 1423297)

          Result = FAILURE
          edwardyoon :
          Files :

          • /hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
          Show
          Hudson added a comment - Integrated in Hama trunk #81 (See https://builds.apache.org/job/Hama%20trunk/81/ ) HAMA-531 : Fix build failure (Revision 1423297) Result = FAILURE edwardyoon : Files : /hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
          Hide
          Hudson added a comment -

          Integrated in Hama trunk #80 (See https://builds.apache.org/job/Hama%20trunk/80/)
          HAMA-531: Minor changes. Make partition dir configurable (Revision 1423269)

          Result = FAILURE
          edwardyoon :
          Files :

          • /hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
          • /hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
          • /hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
          Show
          Hudson added a comment - Integrated in Hama trunk #80 (See https://builds.apache.org/job/Hama%20trunk/80/ ) HAMA-531 : Minor changes. Make partition dir configurable (Revision 1423269) Result = FAILURE edwardyoon : Files : /hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java /hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java /hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
          Hide
          Edward J. Yoon added a comment -
          • I've committed minor changes (configurable partition dir).
          • TODO: This partitioning job is needed only if blocks size is not equal to desired size.
          Show
          Edward J. Yoon added a comment - I've committed minor changes (configurable partition dir). TODO: This partitioning job is needed only if blocks size is not equal to desired size.
          Hide
          Edward J. Yoon added a comment -

          Committed.

          • SSSP and bipartite graph examples should be fixed.
          • create new ticket if VertexInputReader.
          Show
          Edward J. Yoon added a comment - Committed. SSSP and bipartite graph examples should be fixed. create new ticket if VertexInputReader.
          Hide
          Thomas Jungblut added a comment -

          Also we will need a VertexOutputWriter that converts the finished computed vertices to the output. Currently we just write Vertex-ID and Vertex-Value, but many algorithms may require Edge Value writes as well.

          Giraph does is as well and it is really justificated for decoupling raw input from the API.

          Show
          Thomas Jungblut added a comment - Also we will need a VertexOutputWriter that converts the finished computed vertices to the output. Currently we just write Vertex-ID and Vertex-Value, but many algorithms may require Edge Value writes as well. Giraph does is as well and it is really justificated for decoupling raw input from the API.
          Hide
          Edward J. Yoon added a comment -

          I was mean that there's already few changes so i recommented you to use SequenceFileInput, Text, and TextWritable without changing symmetricmatrixgen, pagerank, mdstsearch, .., etc.

          I don't know why you saying like that?

          Show
          Edward J. Yoon added a comment - I was mean that there's already few changes so i recommented you to use SequenceFileInput, Text, and TextWritable without changing symmetricmatrixgen, pagerank, mdstsearch, .., etc. I don't know why you saying like that?
          Hide
          Thomas Jungblut added a comment -

          Okay Edward, you haven't understood how it works.

          Maybe you know it better when I show you some code.

          Mindist search example

          If you define in the job:

              job.setInputKeyClass(Text.class);
              job.setInputValueClass(TextArrayWritable.class);
              job.setInputFormat(SequenceFileInputFormat.class);
              job.setVertexInputReaderClass(MindistSearchSequenceFilereader.class);
          

          And you define your reader like this:

          public static class MindistSearchCountReader extends
            VertexInputReader<Text, TextArrayWritable, Text, NullWritable, Text> {
          
           @Override
           public boolean parseVertex(Text key, TextArrayWritable value,
          		Vertex<Text, NullWritable, Text> vertex) {
          	vertex.setVertexID(key);
          	for(Text edgeName : value.get()){
          		vertex.addEdge(new Edge<Text, NullWritable>(new Text(edgeName), null));
          	}
          	return true;
           } 
          
          }
          

          Then you can support your binary sequencefile format as well as well as all other damn formats that exists in the whole world.

          If you want to have a binary sequencefile format, then do this. But I will quit committing to Hama then, because I'm not going to support ONLY a binary format. This is not what I build a framework for.

          What if you want to change this binary format? Do you want to recreate every file on the whole planet? You must take care about versioning then, and that is because we need a proxy between the inputformat and our vertex API.

          Show
          Thomas Jungblut added a comment - Okay Edward, you haven't understood how it works. Maybe you know it better when I show you some code. Mindist search example If you define in the job: job.setInputKeyClass(Text.class); job.setInputValueClass(TextArrayWritable.class); job.setInputFormat(SequenceFileInputFormat.class); job.setVertexInputReaderClass(MindistSearchSequenceFilereader.class); And you define your reader like this: public static class MindistSearchCountReader extends VertexInputReader<Text, TextArrayWritable, Text, NullWritable, Text> { @Override public boolean parseVertex(Text key, TextArrayWritable value, Vertex<Text, NullWritable, Text> vertex) { vertex.setVertexID(key); for(Text edgeName : value.get()){ vertex.addEdge(new Edge<Text, NullWritable>(new Text(edgeName), null)); } return true; } } Then you can support your binary sequencefile format as well as well as all other damn formats that exists in the whole world. If you want to have a binary sequencefile format, then do this. But I will quit committing to Hama then, because I'm not going to support ONLY a binary format. This is not what I build a framework for. What if you want to change this binary format? Do you want to recreate every file on the whole planet? You must take care about versioning then, and that is because we need a proxy between the inputformat and our vertex API.
          Hide
          Edward J. Yoon added a comment -

          If possible, please use SequenceFileInputFormat, Text, and TextArrayWritable.

          Show
          Edward J. Yoon added a comment - If possible, please use SequenceFileInputFormat, Text, and TextArrayWritable.
          Hide
          Thomas Jungblut added a comment -

          Yes we need this reader, for example in the bipartite graph matching, it would work without that.
          But it is enough to add a single reader for each value type, so we should be okay with 1 or 2.

          Show
          Thomas Jungblut added a comment - Yes we need this reader, for example in the bipartite graph matching, it would work without that. But it is enough to add a single reader for each value type, so we should be okay with 1 or 2.
          Hide
          Edward J. Yoon added a comment -

          Attach again. Fixed PageRankTest.

          OKay. BTW, then all examples should have their own parseVertex() method again?

          Show
          Edward J. Yoon added a comment - Attach again. Fixed PageRankTest. OKay. BTW, then all examples should have their own parseVertex() method again?
          Hide
          Thomas Jungblut added a comment -

          Thanks, I have a presentation today, so I will be quite late back at home. But I will add the backward compatibility to your Jobs. Please be patient.

          Show
          Thomas Jungblut added a comment - Thanks, I have a presentation today, so I will be quite late back at home. But I will add the backward compatibility to your Jobs. Please be patient.
          Hide
          Edward J. Yoon added a comment -

          This patch adds partitioning job. You can check TestSubmitGraphJob how it works.

          Show
          Edward J. Yoon added a comment - This patch adds partitioning job. You can check TestSubmitGraphJob how it works.
          Hide
          Edward J. Yoon added a comment -

          This patch fixes unit tests except weighted graph example (SSSP). Once all done, I'll fix partitioner.

          My plan for partitioning input data is by using the BSP job. Each task processes a single input data block and writes files into destination directory. Finally, merge files. Then, the number of partitions can be specified by desired number.

          Show
          Edward J. Yoon added a comment - This patch fixes unit tests except weighted graph example (SSSP). Once all done, I'll fix partitioner. My plan for partitioning input data is by using the BSP job. Each task processes a single input data block and writes files into destination directory. Finally, merge files. Then, the number of partitions can be specified by desired number.
          Hide
          Edward J. Yoon added a comment -

          P.S., modifications of FileInputFormat are just for test.

          Show
          Edward J. Yoon added a comment - P.S., modifications of FileInputFormat are just for test.
          Hide
          Edward J. Yoon added a comment -

          This patch cleans up the graph package.

          Show
          Edward J. Yoon added a comment - This patch cleans up the graph package.
          Hide
          Edward J. Yoon added a comment - - edited

          I think the input partitioning issue should be handled at the BSP framework level. But, VertexInputReader hold back possibilities of integration. Since user can create their own InputFormatter, I don't think it's important feature.

          Moreover, once partitioning-process separated from Graph job, it will make able to specify desired number of tasks user want.

          Show
          Edward J. Yoon added a comment - - edited I think the input partitioning issue should be handled at the BSP framework level. But, VertexInputReader hold back possibilities of integration. Since user can create their own InputFormatter, I don't think it's important feature. Moreover, once partitioning-process separated from Graph job, it will make able to specify desired number of tasks user want.
          Hide
          Edward J. Yoon added a comment -

          I think partitioner should be reusable between core and graph packages but have no idea about VertexReader.

          Show
          Edward J. Yoon added a comment - I think partitioner should be reusable between core and graph packages but have no idea about VertexReader.
          Hide
          Edward J. Yoon added a comment -

          This issue must be fixed.

          Show
          Edward J. Yoon added a comment - This issue must be fixed.
          Hide
          Thomas Jungblut added a comment -

          Thanks Edward, you are right.
          I've just observed it in the testcases:

          12/05/23 19:41:37 INFO bsp.BSPJobClient: Running job: job_localrunner_0001
          12/05/23 19:41:37 ERROR bsp.LocalBSPRunner: Exception during BSP execution!
          java.io.IOException: org.apache.hama.graph.VertexWritable@78092b6f read 42 bytes, should read 49
          	at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2129)
          	at org.apache.hama.bsp.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82)
          	at org.apache.hama.bsp.TrackedRecordReader.moveToNext(TrackedRecordReader.java:60)
          	at org.apache.hama.bsp.TrackedRecordReader.next(TrackedRecordReader.java:46)
          	at org.apache.hama.bsp.BSPPeerImpl.readNext(BSPPeerImpl.java:495)
          	at org.apache.hama.graph.GraphJobRunner.loadVertices(GraphJobRunner.java:395)
          

          I fix this in HAMA-580.

          Show
          Thomas Jungblut added a comment - Thanks Edward, you are right. I've just observed it in the testcases: 12/05/23 19:41:37 INFO bsp.BSPJobClient: Running job: job_localrunner_0001 12/05/23 19:41:37 ERROR bsp.LocalBSPRunner: Exception during BSP execution! java.io.IOException: org.apache.hama.graph.VertexWritable@78092b6f read 42 bytes, should read 49 at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2129) at org.apache.hama.bsp.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82) at org.apache.hama.bsp.TrackedRecordReader.moveToNext(TrackedRecordReader.java:60) at org.apache.hama.bsp.TrackedRecordReader.next(TrackedRecordReader.java:46) at org.apache.hama.bsp.BSPPeerImpl.readNext(BSPPeerImpl.java:495) at org.apache.hama.graph.GraphJobRunner.loadVertices(GraphJobRunner.java:395) I fix this in HAMA-580 .
          Hide
          Thomas Jungblut added a comment -

          Hf, however the question is if the input is broken or the reader. The split is handled by the sequencefile and not by the filesystem. I take a closer look then. Thanks for the observation.

          Show
          Thomas Jungblut added a comment - Hf, however the question is if the input is broken or the reader. The split is handled by the sequencefile and not by the filesystem. I take a closer look then. Thanks for the observation.
          Hide
          Edward J. Yoon added a comment -

          Haha, I'm heading out to dinner. cu again.

          Show
          Edward J. Yoon added a comment - Haha, I'm heading out to dinner. cu again.
          Hide
          Thomas Jungblut added a comment -

          That is really bad.

          Show
          Thomas Jungblut added a comment - That is really bad.
          Hide
          Edward J. Yoon added a comment -

          If you don't provide task number does it work as well?

          No. I think it works only on single machine.

          Show
          Edward J. Yoon added a comment - If you don't provide task number does it work as well? No. I think it works only on single machine.
          Hide
          Thomas Jungblut added a comment -

          Seems that the sequencefile was splitted right between a text. If you don't provide task number does it work as well?

          Show
          Thomas Jungblut added a comment - Seems that the sequencefile was splitted right between a text. If you don't provide task number does it work as well?
          Hide
          Edward J. Yoon added a comment -
          12/05/23 19:03:17 DEBUG graph.GraphJobRunner: Combiner class: org.apache.hama.examples.SSSP$MinIntCombiner
          12/05/23 19:03:17 DEBUG graph.GraphJobRunner: vertex class: org.apache.hama.examples.SSSP$ShortestPathVertex
          12/05/23 19:03:17 ERROR bsp.BSPTask: Error running bsp setup and bsp function.
          java.io.IOException: org.apache.hadoop.io.Text read 31 bytes, should read 190
          	at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2129)
          	at org.apache.hama.bsp.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82)
          	at org.apache.hama.bsp.TrackedRecordReader.moveToNext(TrackedRecordReader.java:60)
          	at org.apache.hama.bsp.TrackedRecordReader.next(TrackedRecordReader.java:46)
          	at org.apache.hama.bsp.BSPPeerImpl.readNext(BSPPeerImpl.java:482)
          	at org.apache.hama.graph.GraphJobRunner.loadVertices(GraphJobRunner.java:280)
          	at org.apache.hama.graph.GraphJobRunner.setup(GraphJobRunner.java:113)
          	at org.apache.hama.bsp.BSPTask.runBSP(BSPTask.java:166)
          	at org.apache.hama.bsp.BSPTask.run(BSPTask.java:144)
          	at org.apache.hama.bsp.GroomServer$BSPPeerChild.main(GroomServer.java:1097)
          12/05/23 19:03:17 INFO zookeeper.ZooKeeper: Session: 0x137792074af000e closed
          12/05/23 19:03:17 INFO zookeeper.ClientCnxn: EventThread shut down
          12/05/23 19:03:17 ERROR bsp.BSPTask: Shutting down ping service.
          12/05/23 19:03:17 FATAL bsp.GroomServer: Error running child
          java.io.IOException: org.apache.hadoop.io.Text read 31 bytes, should read 190
          	at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2129)
          	at org.apache.hama.bsp.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82)
          	at org.apache.hama.bsp.TrackedRecordReader.moveToNext(TrackedRecordReader.java:60)
          	at org.apache.hama.bsp.TrackedRecordReader.next(TrackedRecordReader.java:46)
          	at org.apache.hama.bsp.BSPPeerImpl.readNext(BSPPeerImpl.java:482)
          	at org.apache.hama.graph.GraphJobRunner.loadVertices(GraphJobRunner.java:280)
          	at org.apache.hama.graph.GraphJobRunner.setup(GraphJobRunner.java:113)
          	at org.apache.hama.bsp.BSPTask.runBSP(BSPTask.java:166)
          	at org.apache.hama.bsp.BSPTask.run(BSPTask.java:144)
          	at org.apache.hama.bsp.GroomServer$BSPPeerChild.main(GroomServer.java:1097)
          java.io.IOException: org.apache.hadoop.io.Text read 31 bytes, should read 190
          	at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2129)
          	at org.apache.hama.bsp.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82)
          	at org.apache.hama.bsp.TrackedRecordReader.moveToNext(TrackedRecordReader.java:60)
          	at org.apache.hama.bsp.TrackedRecordReader.next(TrackedRecordReader.java:46)
          	at org.apache.hama.bsp.BSPPeerImpl.readNext(BSPPeerImpl.java:482)
          	at org.apache.hama.graph.GraphJobRunner.loadVertices(GraphJobRunner.java:280)
          	at org.apache.hama.graph.GraphJobRunner.setup(GraphJobRunner.java:113)
          	at org.apache.hama.bsp.BSPTask.runBSP(BSPTask.java:166)
          	at org.apache.hama.bsp.BSPTask.run(BSPTask.java:144)
          	at org.apache.hama.bsp.GroomServer$BSPPeerChild.main(GroomServer.java:1097)
          
          Show
          Edward J. Yoon added a comment - 12/05/23 19:03:17 DEBUG graph.GraphJobRunner: Combiner class: org.apache.hama.examples.SSSP$MinIntCombiner 12/05/23 19:03:17 DEBUG graph.GraphJobRunner: vertex class: org.apache.hama.examples.SSSP$ShortestPathVertex 12/05/23 19:03:17 ERROR bsp.BSPTask: Error running bsp setup and bsp function. java.io.IOException: org.apache.hadoop.io.Text read 31 bytes, should read 190 at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2129) at org.apache.hama.bsp.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82) at org.apache.hama.bsp.TrackedRecordReader.moveToNext(TrackedRecordReader.java:60) at org.apache.hama.bsp.TrackedRecordReader.next(TrackedRecordReader.java:46) at org.apache.hama.bsp.BSPPeerImpl.readNext(BSPPeerImpl.java:482) at org.apache.hama.graph.GraphJobRunner.loadVertices(GraphJobRunner.java:280) at org.apache.hama.graph.GraphJobRunner.setup(GraphJobRunner.java:113) at org.apache.hama.bsp.BSPTask.runBSP(BSPTask.java:166) at org.apache.hama.bsp.BSPTask.run(BSPTask.java:144) at org.apache.hama.bsp.GroomServer$BSPPeerChild.main(GroomServer.java:1097) 12/05/23 19:03:17 INFO zookeeper.ZooKeeper: Session: 0x137792074af000e closed 12/05/23 19:03:17 INFO zookeeper.ClientCnxn: EventThread shut down 12/05/23 19:03:17 ERROR bsp.BSPTask: Shutting down ping service. 12/05/23 19:03:17 FATAL bsp.GroomServer: Error running child java.io.IOException: org.apache.hadoop.io.Text read 31 bytes, should read 190 at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2129) at org.apache.hama.bsp.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82) at org.apache.hama.bsp.TrackedRecordReader.moveToNext(TrackedRecordReader.java:60) at org.apache.hama.bsp.TrackedRecordReader.next(TrackedRecordReader.java:46) at org.apache.hama.bsp.BSPPeerImpl.readNext(BSPPeerImpl.java:482) at org.apache.hama.graph.GraphJobRunner.loadVertices(GraphJobRunner.java:280) at org.apache.hama.graph.GraphJobRunner.setup(GraphJobRunner.java:113) at org.apache.hama.bsp.BSPTask.runBSP(BSPTask.java:166) at org.apache.hama.bsp.BSPTask.run(BSPTask.java:144) at org.apache.hama.bsp.GroomServer$BSPPeerChild.main(GroomServer.java:1097) java.io.IOException: org.apache.hadoop.io.Text read 31 bytes, should read 190 at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2129) at org.apache.hama.bsp.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82) at org.apache.hama.bsp.TrackedRecordReader.moveToNext(TrackedRecordReader.java:60) at org.apache.hama.bsp.TrackedRecordReader.next(TrackedRecordReader.java:46) at org.apache.hama.bsp.BSPPeerImpl.readNext(BSPPeerImpl.java:482) at org.apache.hama.graph.GraphJobRunner.loadVertices(GraphJobRunner.java:280) at org.apache.hama.graph.GraphJobRunner.setup(GraphJobRunner.java:113) at org.apache.hama.bsp.BSPTask.runBSP(BSPTask.java:166) at org.apache.hama.bsp.BSPTask.run(BSPTask.java:144) at org.apache.hama.bsp.GroomServer$BSPPeerChild.main(GroomServer.java:1097)
          Hide
          Thomas Jungblut added a comment -

          Any tasklogs? Why does it work with 3 tasks and not with 4?

          Show
          Thomas Jungblut added a comment - Any tasklogs? Why does it work with 3 tasks and not with 4?
          Hide
          Edward J. Yoon added a comment -

          It seems there's a bug in partitioning.

          edward@slave:~/workspace/hama-trunk$ bin/hama jar examples/target/hama-examples-0.5.0-incubating-SNAPSHOT.jar sssp 3 /user/edward/data/part-r-00000 output 3
          12/05/23 18:46:33 DEBUG bsp.BSPJobClient: BSPJobClient.submitJobDir: hdfs://slave.udanax.org:9001/tmp/hadoop-edward/bsp/system/submit_mue8lf
          12/05/23 18:46:33 DEBUG bsp.BSPJobClient: Creating splits at hdfs://slave.udanax.org:9001/tmp/hadoop-edward/bsp/system/submit_mue8lf/job.split
          12/05/23 18:46:33 INFO bsp.FileInputFormat: Total input paths to process : 1
          12/05/23 18:46:33 DEBUG bsp.FileInputFormat: computeSplitSize: 70724 (70724, 2000, 67108864)
          12/05/23 18:46:33 INFO bsp.FileInputFormat: Total # of splits: 3
          12/05/23 18:46:33 INFO bsp.BSPJobClient: Running job: job_201205231839_0009
          12/05/23 18:46:36 INFO bsp.BSPJobClient: Current supersteps number: 0
          12/05/23 18:46:39 INFO bsp.BSPJobClient: Current supersteps number: 12
          12/05/23 18:46:39 INFO bsp.BSPJobClient: The total number of supersteps: 12
          12/05/23 18:46:39 DEBUG bsp.Counters: Adding SUPERSTEPS
          12/05/23 18:46:39 INFO bsp.BSPJobClient: Counters: 10
          12/05/23 18:46:39 INFO bsp.BSPJobClient:   org.apache.hama.bsp.JobInProgress$JobCounter
          12/05/23 18:46:39 INFO bsp.BSPJobClient:     LAUNCHED_TASKS=3
          12/05/23 18:46:39 INFO bsp.BSPJobClient:   org.apache.hama.bsp.BSPPeerImpl$PeerCounter
          12/05/23 18:46:39 INFO bsp.BSPJobClient:     SUPERSTEPS=12
          12/05/23 18:46:39 INFO bsp.BSPJobClient:     COMPRESSED_BYTES_SENT=27902
          12/05/23 18:46:39 INFO bsp.BSPJobClient:     SUPERSTEP_SUM=36
          12/05/23 18:46:39 INFO bsp.BSPJobClient:     TIME_IN_SYNC_MS=4369
          12/05/23 18:46:39 INFO bsp.BSPJobClient:     IO_BYTES_READ=212069
          12/05/23 18:46:39 INFO bsp.BSPJobClient:     COMPRESSED_BYTES_RECEIVED=27902
          12/05/23 18:46:39 INFO bsp.BSPJobClient:     TOTAL_MESSAGES_SENT=4374
          12/05/23 18:46:39 INFO bsp.BSPJobClient:     TASK_INPUT_RECORDS=100
          12/05/23 18:46:39 INFO bsp.BSPJobClient:     TOTAL_MESSAGES_RECEIVED=2187
          Job Finished in 6.517 seconds
          edward@slave:~/workspace/hama-trunk$ bin/hama jar examples/target/hama-examples-0.5.0-incubating-SNAPSHOT.jar sssp 3 /user/edward/data/part-r-00000 output 4
          12/05/23 18:46:44 DEBUG bsp.BSPJobClient: BSPJobClient.submitJobDir: hdfs://slave.udanax.org:9001/tmp/hadoop-edward/bsp/system/submit_a44pqb
          12/05/23 18:46:44 DEBUG bsp.BSPJobClient: Creating splits at hdfs://slave.udanax.org:9001/tmp/hadoop-edward/bsp/system/submit_a44pqb/job.split
          12/05/23 18:46:44 INFO bsp.FileInputFormat: Total input paths to process : 1
          12/05/23 18:46:44 DEBUG bsp.FileInputFormat: computeSplitSize: 53043 (53043, 2000, 67108864)
          12/05/23 18:46:44 INFO bsp.FileInputFormat: Total # of splits: 4
          12/05/23 18:46:44 INFO bsp.BSPJobClient: Running job: job_201205231839_0010
          12/05/23 18:46:47 INFO bsp.BSPJobClient: Current supersteps number: 0
          12/05/23 18:46:56 INFO bsp.BSPJobClient: Job failed.
          
          Show
          Edward J. Yoon added a comment - It seems there's a bug in partitioning. edward@slave:~/workspace/hama-trunk$ bin/hama jar examples/target/hama-examples-0.5.0-incubating-SNAPSHOT.jar sssp 3 /user/edward/data/part-r-00000 output 3 12/05/23 18:46:33 DEBUG bsp.BSPJobClient: BSPJobClient.submitJobDir: hdfs: //slave.udanax.org:9001/tmp/hadoop-edward/bsp/system/submit_mue8lf 12/05/23 18:46:33 DEBUG bsp.BSPJobClient: Creating splits at hdfs: //slave.udanax.org:9001/tmp/hadoop-edward/bsp/system/submit_mue8lf/job.split 12/05/23 18:46:33 INFO bsp.FileInputFormat: Total input paths to process : 1 12/05/23 18:46:33 DEBUG bsp.FileInputFormat: computeSplitSize: 70724 (70724, 2000, 67108864) 12/05/23 18:46:33 INFO bsp.FileInputFormat: Total # of splits: 3 12/05/23 18:46:33 INFO bsp.BSPJobClient: Running job: job_201205231839_0009 12/05/23 18:46:36 INFO bsp.BSPJobClient: Current supersteps number: 0 12/05/23 18:46:39 INFO bsp.BSPJobClient: Current supersteps number: 12 12/05/23 18:46:39 INFO bsp.BSPJobClient: The total number of supersteps: 12 12/05/23 18:46:39 DEBUG bsp.Counters: Adding SUPERSTEPS 12/05/23 18:46:39 INFO bsp.BSPJobClient: Counters: 10 12/05/23 18:46:39 INFO bsp.BSPJobClient: org.apache.hama.bsp.JobInProgress$JobCounter 12/05/23 18:46:39 INFO bsp.BSPJobClient: LAUNCHED_TASKS=3 12/05/23 18:46:39 INFO bsp.BSPJobClient: org.apache.hama.bsp.BSPPeerImpl$PeerCounter 12/05/23 18:46:39 INFO bsp.BSPJobClient: SUPERSTEPS=12 12/05/23 18:46:39 INFO bsp.BSPJobClient: COMPRESSED_BYTES_SENT=27902 12/05/23 18:46:39 INFO bsp.BSPJobClient: SUPERSTEP_SUM=36 12/05/23 18:46:39 INFO bsp.BSPJobClient: TIME_IN_SYNC_MS=4369 12/05/23 18:46:39 INFO bsp.BSPJobClient: IO_BYTES_READ=212069 12/05/23 18:46:39 INFO bsp.BSPJobClient: COMPRESSED_BYTES_RECEIVED=27902 12/05/23 18:46:39 INFO bsp.BSPJobClient: TOTAL_MESSAGES_SENT=4374 12/05/23 18:46:39 INFO bsp.BSPJobClient: TASK_INPUT_RECORDS=100 12/05/23 18:46:39 INFO bsp.BSPJobClient: TOTAL_MESSAGES_RECEIVED=2187 Job Finished in 6.517 seconds edward@slave:~/workspace/hama-trunk$ bin/hama jar examples/target/hama-examples-0.5.0-incubating-SNAPSHOT.jar sssp 3 /user/edward/data/part-r-00000 output 4 12/05/23 18:46:44 DEBUG bsp.BSPJobClient: BSPJobClient.submitJobDir: hdfs: //slave.udanax.org:9001/tmp/hadoop-edward/bsp/system/submit_a44pqb 12/05/23 18:46:44 DEBUG bsp.BSPJobClient: Creating splits at hdfs: //slave.udanax.org:9001/tmp/hadoop-edward/bsp/system/submit_a44pqb/job.split 12/05/23 18:46:44 INFO bsp.FileInputFormat: Total input paths to process : 1 12/05/23 18:46:44 DEBUG bsp.FileInputFormat: computeSplitSize: 53043 (53043, 2000, 67108864) 12/05/23 18:46:44 INFO bsp.FileInputFormat: Total # of splits: 4 12/05/23 18:46:44 INFO bsp.BSPJobClient: Running job: job_201205231839_0010 12/05/23 18:46:47 INFO bsp.BSPJobClient: Current supersteps number: 0 12/05/23 18:46:56 INFO bsp.BSPJobClient: Job failed.
          Hide
          Thomas Jungblut added a comment -

          I think we should delay this a bit. I'm going to commit this partitioning for graph algorithms now since this is our major feature.

          Show
          Thomas Jungblut added a comment - I think we should delay this a bit. I'm going to commit this partitioning for graph algorithms now since this is our major feature.
          Hide
          Edward J. Yoon added a comment -

          I think there're some issues related with reopenInput() function, ..., etc.

          Show
          Edward J. Yoon added a comment - I think there're some issues related with reopenInput() function, ..., etc.
          Hide
          praveen sripati added a comment -

          The partitioned data can be optionally written to HDFS so that another BSP (or MR) job can be run without partitioning the data again. HAMA-577 has been created for the same.

          Show
          praveen sripati added a comment - The partitioned data can be optionally written to HDFS so that another BSP (or MR) job can be run without partitioning the data again. HAMA-577 has been created for the same.
          Hide
          Edward J. Yoon added a comment -

          In BSP case, it looks like pre-partitioned data is needed.

          Show
          Edward J. Yoon added a comment - In BSP case, it looks like pre-partitioned data is needed.
          Hide
          praveen sripati added a comment -

          >However we should think about how we build the pre-job partitioner.

          Thomas - HAMA-561 has been created for processing already partitioned files.

          Show
          praveen sripati added a comment - >However we should think about how we build the pre-job partitioner. Thomas - HAMA-561 has been created for processing already partitioned files.
          Hide
          Edward J. Yoon added a comment -
          I take a first shot for the graph algorithms.
          I guess we should distinct between pre-job partitioning and runtime partitioning. For graph algorithms we can use runtime partitioning.
          For other algorithms this might not be suitable.
          

          +1

          Show
          Edward J. Yoon added a comment - I take a first shot for the graph algorithms. I guess we should distinct between pre-job partitioning and runtime partitioning. For graph algorithms we can use runtime partitioning. For other algorithms this might not be suitable. +1
          Hide
          Thomas Jungblut added a comment -

          okay works now, basically it was because pagerank input adjacent edges were marked as double instead of null.

          This broke the serialization.

          Fixed, build is fine. I'd like to commit this tomorrow.

          However we should think about how we build the pre-job partitioner.

          Show
          Thomas Jungblut added a comment - okay works now, basically it was because pagerank input adjacent edges were marked as double instead of null. This broke the serialization. Fixed, build is fine. I'd like to commit this tomorrow. However we should think about how we build the pre-job partitioner.
          Hide
          Thomas Jungblut added a comment -

          works quite okay, besides the edge weight serialization quite breaks the stuff.

          Don't know if it is related to NullWritable or if I have another mistake, however I will fix that soon. Currently this part is deactivated.

          Show
          Thomas Jungblut added a comment - works quite okay, besides the edge weight serialization quite breaks the stuff. Don't know if it is related to NullWritable or if I have another mistake, however I will fix that soon. Currently this part is deactivated.
          Hide
          Thomas Jungblut added a comment -

          small patch hacked.

          Show
          Thomas Jungblut added a comment - small patch hacked.
          Hide
          Thomas Jungblut added a comment -

          I take a first shot for the graph algorithms.
          I guess we should distinct between pre-job partitioning and runtime partitioning. For graph algorithms we can use runtime partitioning.
          For other algorithms this might not be suitable.

          Show
          Thomas Jungblut added a comment - I take a first shot for the graph algorithms. I guess we should distinct between pre-job partitioning and runtime partitioning. For graph algorithms we can use runtime partitioning. For other algorithms this might not be suitable.
          Hide
          Thomas Jungblut added a comment -

          Two possible approaches:

          We schedule a BSP job to write to a given number of files,
          OR we use the same logic like the graph repair that will take a first superstep to read all the things and distribute it among the tasks afterwards.

          I think that the last solution is quite simple.

          Does anyone know how it is done in Giraph?

          Don't know, bet on the second solution, since their mapper input isn't very likely to be partitioned.

          Show
          Thomas Jungblut added a comment - Two possible approaches: We schedule a BSP job to write to a given number of files, OR we use the same logic like the graph repair that will take a first superstep to read all the things and distribute it among the tasks afterwards. I think that the last solution is quite simple. Does anyone know how it is done in Giraph? Don't know, bet on the second solution, since their mapper input isn't very likely to be partitioned.
          Hide
          Thomas Jungblut added a comment -

          Sounds reasonable to schedule a BSP. Should we put this into 0.5.0?

          Show
          Thomas Jungblut added a comment - Sounds reasonable to schedule a BSP. Should we put this into 0.5.0?
          Hide
          praveen sripati added a comment -

          I haven't gathered any performance metrics, but partitioning in the BSPJobClient (on the node on which the BSP job is submitted) seems to be not very efficient. Moving the data partitioning from the BSPJobClient to do the processing parallely will cut short the total time for processing drastically. So, I am interested in getting some thought process going on around this JIRA.

          In the JIRA two approaches have been mentioned.

          1. Using BSP to partition the data.
          2. Using MR to partition the data.

          Using the MR approach

          • The data has to be read by the mappers (READ)
          • The output of the mapper has to be written the file system (WRITE)
          • Reducers have to read the data back from the file system (READ)
          • Reducers process and write the data back to HDFS (WRITE)
          • The BSP Job reads the MR output (READ) and does the processing

          So, there are 3 Reads and 2 Writes, before the data is actually processed by the BSP Job.

          Using the BSP Job

          • The data is read by the BSP Task (READ)
          • BSP task checks which task the record belongs to using the partitioner and sends the message to the appropriate task.
          • Global Sync
          • The bsp tasks write data to HDFS (optional WRITE)
          • The various bsp tasks receive the message and start processing immediately.

          So, there is only 1 Read.

          Partitioning using BSP seems to be much faster when compared to MR. The only advantage I see of the MR approach is that since the partitioned data is written to the disk, the same BSP job can be run multiple times without any partitioning the data again. Of course, the BSP tasks could also write the partitioned data to the HDFS to be processed later if required. I don't see any obvious advantage using the MR approach over BSP approach.

          Does anyone know how it is done in Giraph?

          Show
          praveen sripati added a comment - I haven't gathered any performance metrics, but partitioning in the BSPJobClient (on the node on which the BSP job is submitted) seems to be not very efficient. Moving the data partitioning from the BSPJobClient to do the processing parallely will cut short the total time for processing drastically. So, I am interested in getting some thought process going on around this JIRA. In the JIRA two approaches have been mentioned. 1. Using BSP to partition the data. 2. Using MR to partition the data. Using the MR approach The data has to be read by the mappers (READ) The output of the mapper has to be written the file system (WRITE) Reducers have to read the data back from the file system (READ) Reducers process and write the data back to HDFS (WRITE) The BSP Job reads the MR output (READ) and does the processing So, there are 3 Reads and 2 Writes, before the data is actually processed by the BSP Job. Using the BSP Job The data is read by the BSP Task (READ) BSP task checks which task the record belongs to using the partitioner and sends the message to the appropriate task. Global Sync The bsp tasks write data to HDFS (optional WRITE) The various bsp tasks receive the message and start processing immediately. So, there is only 1 Read. Partitioning using BSP seems to be much faster when compared to MR. The only advantage I see of the MR approach is that since the partitioned data is written to the disk, the same BSP job can be run multiple times without any partitioning the data again. Of course, the BSP tasks could also write the partitioned data to the HDFS to be processed later if required. I don't see any obvious advantage using the MR approach over BSP approach. Does anyone know how it is done in Giraph?
          Hide
          Thomas Jungblut added a comment -

          That could be an idea.
          We could also schedule a MapReduce job to do this partitioning when a cluster is available. Or we could schedule a BSP Job to do this like you said.

          Show
          Thomas Jungblut added a comment - That could be an idea. We could also schedule a MapReduce job to do this partitioning when a cluster is available. Or we could schedule a BSP Job to do this like you said.
          Hide
          Edward J. Yoon added a comment -

          In my opinion, each task should load the locally assigned data and transfer the data with optimized way across the network.

          Show
          Edward J. Yoon added a comment - In my opinion, each task should load the locally assigned data and transfer the data with optimized way across the network.

            People

            • Assignee:
              Edward J. Yoon
              Reporter:
              Edward J. Yoon
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development