Uploaded image for project: 'Cassandra'
  1. Cassandra
  2. CASSANDRA-5544

Hadoop jobs assigns only one mapper in task

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Fix Version/s: 1.2.6
    • Component/s: None
    • Labels:
      None
    • Environment:

      Red hat linux 5.4, Hadoop 1.0.3, pig 0.11.1

      Description

      We have got very strange beheviour of hadoop cluster after upgrading
      Cassandra from 1.1.5 to Cassandra 1.2.1. We have 5 nodes cluster of Cassandra, where three of them are hodoop slaves. Now when we are submitting job through Pig script, only one map assigns in task running on one of the hadoop slaves regardless of
      volume of data (already tried with more than million rows).
      Configure of pig as follows:
      export PIG_HOME=/oracle/pig-0.10.0
      export PIG_CONF_DIR=$

      {HADOOP_HOME}

      /conf
      export PIG_INITIAL_ADDRESS=192.168.157.103
      export PIG_RPC_PORT=9160
      export PIG_PARTITIONER=org.apache.cassandra.dht.Murmur3Partitioner

      Also we have these following properties in hadoop:
      <property>
      <name>mapred.tasktracker.map.tasks.maximum</name>
      <value>10</value>
      </property>
      <property>
      <name>mapred.map.tasks</name>
      <value>4</value>
      </property>

      1. 5544.txt
        0.7 kB
        Alex Liu
      2. 5544-1.txt
        2 kB
        Alex Liu
      3. 5544-2.txt
        4 kB
        Alex Liu
      4. 5544-3.txt
        3 kB
        Alex Liu
      5. Screen Shot 2013-05-26 at 4.49.48 PM.png
        82 kB
        Shamim Ahmed

        Issue Links

          Activity

          Hide
          cscetbon Cyril Scetbon added a comment -

          same issue with Cassandra 1.2.3. I've tested with both RandomPartitioner and Murmur3Partitioner

          Show
          cscetbon Cyril Scetbon added a comment - same issue with Cassandra 1.2.3. I've tested with both RandomPartitioner and Murmur3Partitioner
          Show
          shamim_ru Shamim Ahmed added a comment - For more information, here is some threads from mail archive 1) http://www.mail-archive.com/user@cassandra.apache.org/msg29663.html 2) http://www.mail-archive.com/user@cassandra.apache.org/msg28016.html 3) http://www.mail-archive.com/user@cassandra.apache.org/msg29425.html
          Hide
          brandon.williams Brandon Williams added a comment -

          Does 1.1.11 have the same problem?

          Show
          brandon.williams Brandon Williams added a comment - Does 1.1.11 have the same problem?
          Hide
          shamim_ru Shamim Ahmed added a comment - - edited

          Cassandra version 1.1.11 have no such problem. I have test in single node cluster and it's created 15 map.
          See attach please.

          Show
          shamim_ru Shamim Ahmed added a comment - - edited Cassandra version 1.1.11 have no such problem. I have test in single node cluster and it's created 15 map. See attach please.
          Hide
          cscetbon Cyril Scetbon added a comment -

          So something goes wrong with 1.2.x version

          Show
          cscetbon Cyril Scetbon added a comment - So something goes wrong with 1.2.x version
          Hide
          brandon.williams Brandon Williams added a comment -

          Can you take a look, Alex? Nothing changed in pig as far I know.

          Show
          brandon.williams Brandon Williams added a comment - Can you take a look, Alex? Nothing changed in pig as far I know.
          Hide
          alexliu68 Alex Liu added a comment -

          [~shamim] How many splits do you get for each hadoop node? You can set ConfigHelper.setInputSplitSize to a smaller number to get more mappers for your pig job. The existing CassandraStorage class doesn't set it, so it uses the defualt value of 64k. So if your nodes has less than 64k rows, it will have only one mapper.

          Show
          alexliu68 Alex Liu added a comment - [~shamim] How many splits do you get for each hadoop node? You can set ConfigHelper.setInputSplitSize to a smaller number to get more mappers for your pig job. The existing CassandraStorage class doesn't set it, so it uses the defualt value of 64k. So if your nodes has less than 64k rows, it will have only one mapper.
          Hide
          alexliu68 Alex Liu added a comment -

          Some changes had been made to CassandraColumnInputFormat class since 1.1.5

          e.g.
          add describe_splits_ex providing improved split size estimate
          patch by Piotr Kolaczkowski; reviewed by jbellis for CASSANDRA-4803

          Show
          alexliu68 Alex Liu added a comment - Some changes had been made to CassandraColumnInputFormat class since 1.1.5 e.g. add describe_splits_ex providing improved split size estimate patch by Piotr Kolaczkowski; reviewed by jbellis for CASSANDRA-4803
          Hide
          cscetbon Cyril Scetbon added a comment -

          Alex Liu I did some tests with more than 64k row and had only one mapper for the whole cluster. Even if we have less than 64k rows, why don't we have at least one mapper per node (in my case replication_factor=1) to work on rows using data locality. Vnodes are enabled on my cluster, can there be a relation with this option ?

          Show
          cscetbon Cyril Scetbon added a comment - Alex Liu I did some tests with more than 64k row and had only one mapper for the whole cluster. Even if we have less than 64k rows, why don't we have at least one mapper per node (in my case replication_factor=1) to work on rows using data locality. Vnodes are enabled on my cluster, can there be a relation with this option ?
          Hide
          alexliu68 Alex Liu added a comment -

          Yes, if vnode is enale, it creates a lot of smaller splits (which is not preferred, we will fix the vnode hadoop too many small splits issue later), so can you test it with vnode disable.

          Show
          alexliu68 Alex Liu added a comment - Yes, if vnode is enale, it creates a lot of smaller splits (which is not preferred, we will fix the vnode hadoop too many small splits issue later), so can you test it with vnode disable.
          Hide
          cscetbon Cyril Scetbon added a comment -

          But if there are many small splits it doesn't mean that we should have more mappers ? I'm saying that cause you propose to Shamim Ahmed to decrease ConfigHelper.setInputSplitSize exactly for that, right ?
          I need one more day to test without vnodes.

          Show
          cscetbon Cyril Scetbon added a comment - But if there are many small splits it doesn't mean that we should have more mappers ? I'm saying that cause you propose to Shamim Ahmed to decrease ConfigHelper.setInputSplitSize exactly for that, right ? I need one more day to test without vnodes.
          Hide
          alexliu68 Alex Liu added a comment -

          Current implementation only matches one mapper to a split. Existing code doesn't set InputSplitSize (which means we can't change it to a smaller number unless we change the code at setLocation method to do it), so we need more than 64k rows to have more than one mapper per node.

          For vnode we need to support a virtual split which combines multiple small splits.

          Show
          alexliu68 Alex Liu added a comment - Current implementation only matches one mapper to a split. Existing code doesn't set InputSplitSize (which means we can't change it to a smaller number unless we change the code at setLocation method to do it), so we need more than 64k rows to have more than one mapper per node. For vnode we need to support a virtual split which combines multiple small splits.
          Hide
          cscetbon Cyril Scetbon added a comment - - edited

          okay. I'll test without vnodes and give you a feedback except if Shamim Ahmed confirms that he didn't use vnodes, which I suppose as he upgraded from C* 1.1.5 to 1.2.1

          Show
          cscetbon Cyril Scetbon added a comment - - edited okay. I'll test without vnodes and give you a feedback except if Shamim Ahmed confirms that he didn't use vnodes, which I suppose as he upgraded from C* 1.1.5 to 1.2.1
          Hide
          shamim_ru Shamim Ahmed added a comment -

          Alex Liu
          1) I am using pig and actually don't know how many split i had (i am very curious to know how to calculate the split count). However i have had more than 30 million rows.
          2) I didn't use VNODES.
          3) SET mapred.min.split.size 12500000;
          SET mapred.max.split.size 12500000;
          doesn't help at all
          4) SET pig.noSplitCombination true; - did some magic trick, we got more than 100 maps but 2 of them (always two maps) got very large Map input records and runs more than hours.
          5) Observe one very interesting thing when used SET pig.noSplitCombination true, a lot of maps created with
          Map input records 0

          Show
          shamim_ru Shamim Ahmed added a comment - Alex Liu 1) I am using pig and actually don't know how many split i had (i am very curious to know how to calculate the split count). However i have had more than 30 million rows. 2) I didn't use VNODES. 3) SET mapred.min.split.size 12500000; SET mapred.max.split.size 12500000; doesn't help at all 4) SET pig.noSplitCombination true; - did some magic trick, we got more than 100 maps but 2 of them (always two maps) got very large Map input records and runs more than hours. 5) Observe one very interesting thing when used SET pig.noSplitCombination true, a lot of maps created with Map input records 0
          Hide
          alexliu68 Alex Liu added a comment -

          To get the splits for the node, call thrift API client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize) it returns the split for that node.

          where range.start_token and range.end_token is the start and end token of the node, and splitsize is 64 *1024

          Show
          alexliu68 Alex Liu added a comment - To get the splits for the node, call thrift API client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize) it returns the split for that node. where range.start_token and range.end_token is the start and end token of the node, and splitsize is 64 *1024
          Hide
          alexliu68 Alex Liu added a comment -

          [~shamim] I think you already found the answer, SET pig.noSplitCombination true, so Pig doesn't combine the small splits into one mapper. HBase internal code does it as well. I found that C*-1.2.1 update Pig from 0.9.0 version to 0.10.0 version which may cause the behavior changes.

          As far as number 4) and number 5) concerns, I think the empty maps/big maps are due to data skewness. If you can first print out the splits, then you can check the rows for each split.

          I will add the following code to CassandraStorage.java

          job.getConfiguration().setBoolean("pig.noSplitCombination", true);

          Show
          alexliu68 Alex Liu added a comment - [~shamim] I think you already found the answer, SET pig.noSplitCombination true, so Pig doesn't combine the small splits into one mapper. HBase internal code does it as well. I found that C*-1.2.1 update Pig from 0.9.0 version to 0.10.0 version which may cause the behavior changes. As far as number 4) and number 5) concerns, I think the empty maps/big maps are due to data skewness. If you can first print out the splits, then you can check the rows for each split. I will add the following code to CassandraStorage.java job.getConfiguration().setBoolean("pig.noSplitCombination", true);
          Hide
          alexliu68 Alex Liu added a comment -

          I attached the patch.

          Show
          alexliu68 Alex Liu added a comment - I attached the patch.
          Hide
          cscetbon Cyril Scetbon added a comment -

          AFAIK split combination is used to improve performance. Doesn't it mean the same for cassandra ?
          And if performance decreases without split combination, will the performance decrease much more with vnodes ?

          Show
          cscetbon Cyril Scetbon added a comment - AFAIK split combination is used to improve performance. Doesn't it mean the same for cassandra ? And if performance decreases without split combination, will the performance decrease much more with vnodes ?
          Hide
          alexliu68 Alex Liu added a comment -

          CassandraColumnInputFormat define the split size, so we don't want Pig to override it by combining splits. We can always tune the split size to tune the performance. Next step, we can open up a little bit so that Pig user can specify split size configuration.

          Vnode hadoop performance generally decreases, we can do the split combination at Cassandra side to improve the performance, which could be another ticket.

          Show
          alexliu68 Alex Liu added a comment - CassandraColumnInputFormat define the split size, so we don't want Pig to override it by combining splits. We can always tune the split size to tune the performance. Next step, we can open up a little bit so that Pig user can specify split size configuration. Vnode hadoop performance generally decreases, we can do the split combination at Cassandra side to improve the performance, which could be another ticket.
          Hide
          alexliu68 Alex Liu added a comment -

          Version 2 patch is attached. It allows user to define PIG_INPUT_SPLIT_SIZE in the system env

          Show
          alexliu68 Alex Liu added a comment - Version 2 patch is attached. It allows user to define PIG_INPUT_SPLIT_SIZE in the system env
          Hide
          shamim_ru Shamim Ahmed added a comment -

          Alex, thank you very much for your quick response.
          However, i am afraid that above patch will not solve the problem i described "we got more than 100 maps but 2 of them (always two maps) got very large Map input records and runs more than hours - point 4" - this behavior is unexpected. This means Map input records is not evenly through cluster, most of the maps getting Map input records = 10000 but only two of them getting more than millions.
          Certainly i will do some test through thrift api as you described.
          One more things, would you kindly allows user to define PIG_INPUT_SPLIT_SIZE through cassandra store URL as "STORE updated INTO 'cassandra://KEYSPACE/CF?allow_deletes=true&PIG_INPUT_SPLIT_SIZE=xxxxxx' USING CassandraStorage()" instead of system environment.

          Show
          shamim_ru Shamim Ahmed added a comment - Alex, thank you very much for your quick response. However, i am afraid that above patch will not solve the problem i described "we got more than 100 maps but 2 of them (always two maps) got very large Map input records and runs more than hours - point 4" - this behavior is unexpected. This means Map input records is not evenly through cluster, most of the maps getting Map input records = 10000 but only two of them getting more than millions. Certainly i will do some test through thrift api as you described. One more things, would you kindly allows user to define PIG_INPUT_SPLIT_SIZE through cassandra store URL as "STORE updated INTO 'cassandra://KEYSPACE/CF?allow_deletes=true&PIG_INPUT_SPLIT_SIZE=xxxxxx' USING CassandraStorage()" instead of system environment.
          Hide
          cscetbon Cyril Scetbon added a comment -

          Or maybe via a SET PIG_INPUT_SPLIT_SIZE in Pig script ?
          Alex Liu I open the second ticket to improve performance with vnodes except if you prefer to open it, which could be better

          Show
          cscetbon Cyril Scetbon added a comment - Or maybe via a SET PIG_INPUT_SPLIT_SIZE in Pig script ? Alex Liu I open the second ticket to improve performance with vnodes except if you prefer to open it, which could be better
          Hide
          alexliu68 Alex Liu added a comment -

          Version 3 is attached. I add split_size as a parameter.

          Show
          alexliu68 Alex Liu added a comment - Version 3 is attached. I add split_size as a parameter.
          Hide
          alexliu68 Alex Liu added a comment -

          Cyril Scetbon please open it, someone else may already open it.

          Show
          alexliu68 Alex Liu added a comment - Cyril Scetbon please open it, someone else may already open it.
          Hide
          brandon.williams Brandon Williams added a comment -

          Committed, with an update to the README to document split_size.

          Show
          brandon.williams Brandon Williams added a comment - Committed, with an update to the README to document split_size.
          Hide
          alexliu68 Alex Liu added a comment -

          Version 4 is attached, it removes getting split size as system env

          Show
          alexliu68 Alex Liu added a comment - Version 4 is attached, it removes getting split size as system env
          Hide
          brandon.williams Brandon Williams added a comment -

          I'm fine with leaving that in for now.

          Show
          brandon.williams Brandon Williams added a comment - I'm fine with leaving that in for now.
          Hide
          shamim_ru Shamim Ahmed added a comment -

          I have a plan to do some test in weekend

          Show
          shamim_ru Shamim Ahmed added a comment - I have a plan to do some test in weekend
          Hide
          cscetbon Cyril Scetbon added a comment - - edited

          My tests confirm that I have multiple mappers (1025) and each mapper works on a range of my column family http://pastebin.com/vL3uC5Ca. Good job !

          Show
          cscetbon Cyril Scetbon added a comment - - edited My tests confirm that I have multiple mappers (1025) and each mapper works on a range of my column family http://pastebin.com/vL3uC5Ca . Good job !
          Hide
          shamim_ru Shamim Ahmed added a comment - - edited

          did you run map reduce job through Pig?

          Show
          shamim_ru Shamim Ahmed added a comment - - edited did you run map reduce job through Pig?
          Hide
          cscetbon Cyril Scetbon added a comment - - edited

          Yes. I used Pig 0.11.1, Hadoop 1.1.2 (as newer versions are not supported CASSANDRA-5201) and cassandra 1.2.3 (I added the current patch from git commits and built sources)

          Show
          cscetbon Cyril Scetbon added a comment - - edited Yes. I used Pig 0.11.1, Hadoop 1.1.2 (as newer versions are not supported CASSANDRA-5201 ) and cassandra 1.2.3 (I added the current patch from git commits and built sources)
          Hide
          shamim_ru Shamim Ahmed added a comment -

          At last , i could manage a few hours to try the fix. Definitely it's working now, every mapper works on their own range, however i have test in single node cluster with Hadoop 1.1.2 + Pig 0.11.1 and Cassandra 1.2.6. Thankx.

          Show
          shamim_ru Shamim Ahmed added a comment - At last , i could manage a few hours to try the fix. Definitely it's working now, every mapper works on their own range, however i have test in single node cluster with Hadoop 1.1.2 + Pig 0.11.1 and Cassandra 1.2.6. Thankx.

            People

            • Assignee:
              alexliu68 Alex Liu
              Reporter:
              shamim_ru Shamim Ahmed
              Reviewer:
              Brandon Williams
            • Votes:
              4 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development