Cassandra
  1. Cassandra
  2. CASSANDRA-2388

ColumnFamilyRecordReader fails for a given split because a host is down, even if records could reasonably be read from other replica.

    Details

      Description

      ColumnFamilyRecordReader only tries the first location for a given split. We should try multiple locations for a given split.

      1. 0002_On_TException_try_next_split.patch
        2 kB
        Eldon Stegall
      2. CASSANDRA-2388.patch
        8 kB
        Mck SembWever
      3. CASSANDRA-2388.patch
        14 kB
        Mck SembWever
      4. CASSANDRA-2388.patch
        15 kB
        Mck SembWever
      5. CASSANDRA-2388-addition1.patch
        5 kB
        Mck SembWever
      6. CASSANDRA-2388.patch
        0.9 kB
        Mck SembWever
      7. CASSANDRA-2388-extended.patch
        4 kB
        Mck SembWever

        Issue Links

          Activity

          Hide
          Mck SembWever added a comment -

          The biggest problem is [avoiding endpoints in a different DC]. Maybe the way todo this is change getSplits logic to never return replicas in another DC. I think this would require adding DC info to the describe_ring call

          Tasktrackers may have access to a set of datacenters, so this DC info needs contain a list of DCs.

          For example, our setup separates datacenters by physical datacenter and hadoop-usage, like:

          DC1 "Production + Hadoop"
            c*01 c*03
          DC2 "Production + Hadoop"
            c*02 c*04
          DC3 "Production"
            c*05
          DC4 "Production"
            c*06

          So here we'd pass to getSplits() a DC info like "DC1,DC2".
          But the problem remain, given a task executing on c*01 that fails to connect to localhost, although we can now prevent a connection to DC3 or DC4, we can't favour a connection to any other split in DC1 over anything in DC2. Is this solvable?

          Show
          Mck SembWever added a comment - The biggest problem is [avoiding endpoints in a different DC] . Maybe the way todo this is change getSplits logic to never return replicas in another DC. I think this would require adding DC info to the describe_ring call Tasktrackers may have access to a set of datacenters, so this DC info needs contain a list of DCs. For example, our setup separates datacenters by physical datacenter and hadoop-usage, like: DC1 "Production + Hadoop" c*01 c*03 DC2 "Production + Hadoop" c*02 c*04 DC3 "Production" c*05 DC4 "Production" c*06 So here we'd pass to getSplits() a DC info like "DC1,DC2". But the problem remain, given a task executing on c*01 that fails to connect to localhost, although we can now prevent a connection to DC3 or DC4, we can't favour a connection to any other split in DC1 over anything in DC2. Is this solvable?
          Hide
          Jonathan Ellis added a comment -

          The feasible approach hopefully is still T Jake Luciani's above

          Okay. Referring back to Jake's comments,

          The biggest problem is [avoiding endpoints in a different DC]. Maybe the way todo this is change getSplits logic to never return replicas in another DC. I think this would require adding DC info to the describe_ring call

          I note that we expose node snitch location in system.peers. So at worst we could "join" against that manually.

          Show
          Jonathan Ellis added a comment - The feasible approach hopefully is still T Jake Luciani's above Okay. Referring back to Jake's comments, The biggest problem is [avoiding endpoints in a different DC] . Maybe the way todo this is change getSplits logic to never return replicas in another DC. I think this would require adding DC info to the describe_ring call I note that we expose node snitch location in system.peers. So at worst we could "join" against that manually.
          Hide
          Mck SembWever added a comment -

          Jonathan,
          I can't say i'm in favour of enforcing data locality.
          Because data locality in hadoop doesn't work this way… when a tasktracker through the next heartbeat announces that it has a task slot free the jobtracker will do its best to assign a task with data locality to it but failing this will assign it a random task. the number of these random tasks can be quite high, just like i mentioned above

          Tasks are still being evenly distributed around the ring regardless of what the ColumnFamilySplit.locations is.

          This can be almost solved by upgrading to hadoop-0.21+, using the fair scheduler and setting the property

          <property>
                  <name>mapred.fairscheduler.locality.delay</name>
                  <value>360000000</value>
          <property>

          .

          At the end of the day while hadoop encourages data locality it does not enforce it.
          The ideal approach would be to sort all locations by proximity.
          The feasible approach hopefully is still T Jake Luciani's above. In addition i'd be in favour of a setting in the job's configuration as to whether a location from another datacenter can be used.

          references:

          Show
          Mck SembWever added a comment - Jonathan, I can't say i'm in favour of enforcing data locality. Because data locality in hadoop doesn't work this way… when a tasktracker through the next heartbeat announces that it has a task slot free the jobtracker will do its best to assign a task with data locality to it but failing this will assign it a random task. the number of these random tasks can be quite high, just like i mentioned above Tasks are still being evenly distributed around the ring regardless of what the ColumnFamilySplit.locations is. This can be almost solved by upgrading to hadoop-0.21+, using the fair scheduler and setting the property <property> <name>mapred.fairscheduler.locality.delay</name> <value>360000000</value> <property> . At the end of the day while hadoop encourages data locality it does not enforce it. The ideal approach would be to sort all locations by proximity. The feasible approach hopefully is still T Jake Luciani 's above. In addition i'd be in favour of a setting in the job's configuration as to whether a location from another datacenter can be used. references: http://www.infoq.com/articles/HadoopInputFormat http://www.mentby.com/matei-zaharia/running-only-node-local-jobs.html https://groups.google.com/a/cloudera.org/forum/?fromgroups#!topic/cdh-user/3ggnE5hV0PY http://www.cs.berkeley.edu/~matei/papers/2010/eurosys_delay_scheduling.pdf
          Hide
          Scott Fines added a comment -

          I have two distinct use-cases where running TaskTrackers alongside Cassandra nodes does not accomplish our goals:

          1. Joining data. We have a large data set in cassandra, true, but we have a much larger data set held in Hadoop itself (around 4 orders of magnitude larger in hadoop than in cassandra). We need to join the two datasets together, and use the output from that join to feed multiple systems, none of which are cassandra. Since the data in Hadoop is so much larger than that in Cassandra, we have to bring the Cassandra data to hadoop, not the other way around. Because of security concerns, we can't spread our hadoop data onto our cassandra nodes (even if that didn't screw with our capacity planning), so we have no other choice but to move the Cassandra data (in small chunks) onto Hadoop. Why not use HBase, you say? We needed Cassandra for its write performance for other problems than this one.

          1. Offline, incremental backups. We have a large volume of time-series data held in Cassandra, and taking nightly snapshots and moving them to our archival center is prohibitively slow-it turns out that moving RF copies of our entire dataset over a leased line every night is a pretty bad idea. Instead, I use MapReduce to take an incremental backup of a much smaller subset of the data, then move that. That way, we not only are not moving the entire data set, but we are also using Cassandra's consistency mechanisms to resolve all the replicas. The only efficient way I've found to do this is via MapReduce (we use the Random Partitioner), and since it's an offline backup, we need to move it over the network anyway-may as well use the optimized network connecting Hadoop and Cassandra instead of the tiny pipe connecting cassandra to our archival center.

          Both of these reasons dictate that we not run a TT alongside our Cassandra nodes, no matter what the recommended approach is. In this case, we need a strong, fault-tolerant CFIF to serve our purposes.

          Show
          Scott Fines added a comment - I have two distinct use-cases where running TaskTrackers alongside Cassandra nodes does not accomplish our goals: 1. Joining data. We have a large data set in cassandra, true, but we have a much larger data set held in Hadoop itself (around 4 orders of magnitude larger in hadoop than in cassandra). We need to join the two datasets together, and use the output from that join to feed multiple systems, none of which are cassandra. Since the data in Hadoop is so much larger than that in Cassandra, we have to bring the Cassandra data to hadoop, not the other way around. Because of security concerns, we can't spread our hadoop data onto our cassandra nodes (even if that didn't screw with our capacity planning), so we have no other choice but to move the Cassandra data (in small chunks) onto Hadoop. Why not use HBase, you say? We needed Cassandra for its write performance for other problems than this one. 1. Offline, incremental backups. We have a large volume of time-series data held in Cassandra, and taking nightly snapshots and moving them to our archival center is prohibitively slow- it turns out that moving RF copies of our entire dataset over a leased line every night is a pretty bad idea. Instead, I use MapReduce to take an incremental backup of a much smaller subset of the data, then move that. That way, we not only are not moving the entire data set, but we are also using Cassandra's consistency mechanisms to resolve all the replicas. The only efficient way I've found to do this is via MapReduce (we use the Random Partitioner), and since it's an offline backup, we need to move it over the network anyway -may as well use the optimized network connecting Hadoop and Cassandra instead of the tiny pipe connecting cassandra to our archival center. Both of these reasons dictate that we not run a TT alongside our Cassandra nodes, no matter what the recommended approach is. In this case, we need a strong, fault-tolerant CFIF to serve our purposes.
          Hide
          Jonathan Ellis added a comment -

          Jake's plan above seems like a reasonable approach, but let me back up a step. I'm just not convinced that the problem we're trying to solve is a real one. Why do we want to suck a split's worth of data off-node? If it's because you don't have TackTrackers running on your Cassandra nodes, well, go fix that.

          If it's because Hadoop has created too many tasks and all the local replicas have their task queue full, won't assigning it to a non-local TT just cause more contention, than waiting for a local slot to free up?

          Show
          Jonathan Ellis added a comment - Jake's plan above seems like a reasonable approach, but let me back up a step. I'm just not convinced that the problem we're trying to solve is a real one. Why do we want to suck a split's worth of data off-node? If it's because you don't have TackTrackers running on your Cassandra nodes, well, go fix that. If it's because Hadoop has created too many tasks and all the local replicas have their task queue full, won't assigning it to a non-local TT just cause more contention, than waiting for a local slot to free up?
          Hide
          Lanny Ripple added a comment - - edited

          Would very much like a fix to this. We have a 40 node ring running 2x hadoop clusters on 20 nodes each. One cluster is on systems that are more flaky than the other (bad batch of memory). When building a split on the first cluster if a ring node is down in the area of the second cluster we get timeouts with no way to blacklist the offending node even though we have replicas local to the first cluster.

          The ring is partitioned into DC1:2, DC2:2 with a hadoop cluster over each DC.

          Show
          Lanny Ripple added a comment - - edited Would very much like a fix to this. We have a 40 node ring running 2x hadoop clusters on 20 nodes each. One cluster is on systems that are more flaky than the other (bad batch of memory). When building a split on the first cluster if a ring node is down in the area of the second cluster we get timeouts with no way to blacklist the offending node even though we have replicas local to the first cluster. The ring is partitioned into DC1:2, DC2:2 with a hadoop cluster over each DC.
          Hide
          Jonathan Ellis added a comment -

          Marking as minor since the job should get re-submitted, and it's very difficult to reproduce when the tasktrackers are colocated with cassandra nodes (the recommended configuration).

          Show
          Jonathan Ellis added a comment - Marking as minor since the job should get re-submitted, and it's very difficult to reproduce when the tasktrackers are colocated with cassandra nodes (the recommended configuration).
          Hide
          Hudson added a comment -

          Integrated in Cassandra-0.7 #552 (See https://builds.apache.org/job/Cassandra-0.7/552/)
          revert CASSANDRA-2388 (again)

          jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1170333
          Files :

          • /cassandra/branches/cassandra-0.7/CHANGES.txt
          • /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
          Show
          Hudson added a comment - Integrated in Cassandra-0.7 #552 (See https://builds.apache.org/job/Cassandra-0.7/552/ ) revert CASSANDRA-2388 (again) jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1170333 Files : /cassandra/branches/cassandra-0.7/CHANGES.txt /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
          Hide
          T Jake Luciani added a comment -

          I just want to confirm what this ticket is about.

          The JT has a list of endpoints for a given split.
          When a task runs it may or may not be on one of those nodes
          If other tasks are running on all those replicas the JT may put them on a remote node.

          So we need to decide which endpoint to connect to given the chance that nodes are down.

          1. Check if the node running CFRR is one of the replicas (we have this) this means JT has assigned a data-local task (good)
          2. If none of these nodes are local then pick another.
          3. If connection fails try the one other nodes.
          4. Try to avoid endpoints in a different DC.

          The biggest problem is 4. Maybe the way todo this is change getSplits logic to never return replicas in another DC. I think this would require adding DC info to the describe_ring call. Then we only need to worry about 1-3.

          Show
          T Jake Luciani added a comment - I just want to confirm what this ticket is about. The JT has a list of endpoints for a given split. When a task runs it may or may not be on one of those nodes If other tasks are running on all those replicas the JT may put them on a remote node. So we need to decide which endpoint to connect to given the chance that nodes are down. 1. Check if the node running CFRR is one of the replicas (we have this) this means JT has assigned a data-local task (good) 2. If none of these nodes are local then pick another. 3. If connection fails try the one other nodes. 4. Try to avoid endpoints in a different DC. The biggest problem is 4. Maybe the way todo this is change getSplits logic to never return replicas in another DC. I think this would require adding DC info to the describe_ring call. Then we only need to worry about 1-3.
          Hide
          Mck SembWever added a comment - - edited

          Well that would work for me, was only thinking you want to push a "default behavior" (especially for those using a RP).
          But I think a better understanding (at least from me) of hadoop's task scheduling is required before enforcing data locality, as as-is it certainly doesn't work for all.

          Show
          Mck SembWever added a comment - - edited Well that would work for me, was only thinking you want to push a "default behavior" (especially for those using a RP). But I think a better understanding (at least from me) of hadoop's task scheduling is required before enforcing data locality, as as-is it certainly doesn't work for all.
          Hide
          Jonathan Ellis added a comment -

          Should we just revert the change for now?

          Show
          Jonathan Ellis added a comment - Should we just revert the change for now?
          Hide
          Mck SembWever added a comment -

          In the meantime could we make this behavior configurable.
          eg replace CFRR:176 with something like

              if(ConfigHelper.isDataLocalityDisabled())
              {
                  return split.getLocations()[0];
              }
              else
              {
                  throw new UnsupportedOperationException("no local connection available");
              }
          Show
          Mck SembWever added a comment - In the meantime could we make this behavior configurable. eg replace CFRR:176 with something like if(ConfigHelper.isDataLocalityDisabled()) { return split.getLocations()[0]; } else { throw new UnsupportedOperationException("no local connection available"); }
          Hide
          Mck SembWever added a comment - - edited

          see last comment. (say if this should be a separate bug...)

          Maybe hadoop's task allocation isn't working properly because i've an unbalanced ring (i'm working in parallel to fix that).
          If this is the case i think it's an unfortunate limitation (the ring must be balanced to get any decent hadoop performance).
          It's also probably likely when using ConfigHelper.setInputRange(..) that the number of nodes involved is small (approaching RF).
          With the default hadoop scheduler your hadoop cluster is occupied while just a few taskTrackers are busy. Of course switching to FairScheduler will help some here.

          I'll take a look into hadoop's task allocation code as well...

          Show
          Mck SembWever added a comment - - edited see last comment. (say if this should be a separate bug...) Maybe hadoop's task allocation isn't working properly because i've an unbalanced ring (i'm working in parallel to fix that). If this is the case i think it's an unfortunate limitation (the ring must be balanced to get any decent hadoop performance). It's also probably likely when using ConfigHelper.setInputRange(..) that the number of nodes involved is small (approaching RF). With the default hadoop scheduler your hadoop cluster is occupied while just a few taskTrackers are busy. Of course switching to FairScheduler will help some here. I'll take a look into hadoop's task allocation code as well...
          Hide
          Mck SembWever added a comment - - edited

          This approach isn't really working for me and was committed too quickly i believe.

          Although the documentation on inputSplit.getLocations() is a little thin as to whether this restricts which trackers it should run on or whether is just a preference

          Tasks are still being evenly distributed around the ring regardless of what the ColumnFamilySplit.locations is.

          The chance of a task actually working is RF/N. Therefore the chances of a blacklisted node are high. Worse is that the whole ring can quickly become blacklisted.

          http://abel-perez.com/hadoop-task-assignment has an interesting section in it explaining how the task assignment is supposed to work (and that data locality is preferred but not a requirement). Could ColumnFamilySplit.locations be in the wrong format? (eg they should ip not hostname?).

          Show
          Mck SembWever added a comment - - edited This approach isn't really working for me and was committed too quickly i believe. Although the documentation on inputSplit.getLocations() is a little thin as to whether this restricts which trackers it should run on or whether is just a preference Tasks are still being evenly distributed around the ring regardless of what the ColumnFamilySplit.locations is. The chance of a task actually working is RF/N. Therefore the chances of a blacklisted node are high. Worse is that the whole ring can quickly become blacklisted. http://abel-perez.com/hadoop-task-assignment has an interesting section in it explaining how the task assignment is supposed to work (and that data locality is preferred but not a requirement). Could ColumnFamilySplit.locations be in the wrong format? (eg they should ip not hostname?).
          Hide
          Hudson added a comment -

          Integrated in Cassandra-0.7 #539 (See https://builds.apache.org/job/Cassandra-0.7/539/)
          fail jobs when Cassandra node has failed but TaskTracker has not
          patch by Mck SembWever; reviewed by jbellis and brandonwilliams for CASSANDRA-2388

          jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1159807
          Files :

          • /cassandra/branches/cassandra-0.7/CHANGES.txt
          • /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
          Show
          Hudson added a comment - Integrated in Cassandra-0.7 #539 (See https://builds.apache.org/job/Cassandra-0.7/539/ ) fail jobs when Cassandra node has failed but TaskTracker has not patch by Mck SembWever; reviewed by jbellis and brandonwilliams for CASSANDRA-2388 jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1159807 Files : /cassandra/branches/cassandra-0.7/CHANGES.txt /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
          Hide
          Jonathan Ellis added a comment -

          I agree, since the view is from the coordinator, describe_alive_nodes isn't very helpful

          Committed Mck's most recent patch.

          We have a keyspace with RF=1 and the nature of our data allows us to ignore temporarily missing node

          The "right" fix is to increase RF. Ignoring missing data is not a scenario we want to support.

          Show
          Jonathan Ellis added a comment - I agree, since the view is from the coordinator, describe_alive_nodes isn't very helpful Committed Mck's most recent patch. We have a keyspace with RF=1 and the nature of our data allows us to ignore temporarily missing node The "right" fix is to increase RF. Ignoring missing data is not a scenario we want to support.
          Hide
          Brandon Williams added a comment -

          Does that really fix things though? Because you could have a data node be reachable from the coordinator answering describe_alive_nodes, but unreachable from the client. So the client still needs to be able to skip unreachable endpoints itself, so describe_alive seems like gratuitous complexity.

          I agree, since the view is from the coordinator, describe_alive_nodes isn't very helpful, and also has to wait on the failure detector to mark the node down anyway.

          Show
          Brandon Williams added a comment - Does that really fix things though? Because you could have a data node be reachable from the coordinator answering describe_alive_nodes, but unreachable from the client. So the client still needs to be able to skip unreachable endpoints itself, so describe_alive seems like gratuitous complexity. I agree, since the view is from the coordinator, describe_alive_nodes isn't very helpful, and also has to wait on the failure detector to mark the node down anyway.
          Hide
          Patrik Modesto added a comment -

          I'd like to point out the situation in which no node for a given range of keys is available. It can happen for example with keyspace set to RF=1 and a node goes down. I created a patch that gives a user a chance to ignore missing range/node and continue runnig the MapReduce job. The patch is here: http://pastebin.com/hhrr8m9P

          Jonathan already replied to the ML with "ignoring unavailable ranges is a misfeature, imo".

          In our case it's very usefull, although there may be another/smarter solution. We have a keyspace with RF=1 and the nature of our data allows us to ignore temporarily missing node. The current ColumnFamilyInputFormat fails with RuntimeException and AFAIK there is no way around.

          Show
          Patrik Modesto added a comment - I'd like to point out the situation in which no node for a given range of keys is available. It can happen for example with keyspace set to RF=1 and a node goes down. I created a patch that gives a user a chance to ignore missing range/node and continue runnig the MapReduce job. The patch is here: http://pastebin.com/hhrr8m9P Jonathan already replied to the ML with "ignoring unavailable ranges is a misfeature, imo". In our case it's very usefull, although there may be another/smarter solution. We have a keyspace with RF=1 and the nature of our data allows us to ignore temporarily missing node. The current ColumnFamilyInputFormat fails with RuntimeException and AFAIK there is no way around.
          Hide
          Jonathan Ellis added a comment -

          Does that really fix things though? Because you could have a data node be reachable from the coordinator answering describe_alive_nodes, but unreachable from the client. So the client still needs to be able to skip unreachable endpoints itself, so describe_alive seems like gratuitous complexity.

          Show
          Jonathan Ellis added a comment - Does that really fix things though? Because you could have a data node be reachable from the coordinator answering describe_alive_nodes, but unreachable from the client. So the client still needs to be able to skip unreachable endpoints itself, so describe_alive seems like gratuitous complexity.
          Hide
          Mck SembWever added a comment -

          2) If we ARE in that situation, the "right" solution would be to send the job to a TT whose local replica IS live, not to read the data from a nonlocal replica. How can we signal that?

          To /really/ solve this issue can we do the following?
          In CFIF.getRangeMap() take out of each range any endpoints that are not alive. A client connection already exists in this method. This filtering out of dead endpoints wouldn't be difficult, and would move tasks to the data making use of replica. This approach does need a new method in cassandra.thrift, eg list<string> describe_alive_nodes()

          Show
          Mck SembWever added a comment - 2) If we ARE in that situation, the "right" solution would be to send the job to a TT whose local replica IS live, not to read the data from a nonlocal replica. How can we signal that? To /really/ solve this issue can we do the following? In CFIF.getRangeMap() take out of each range any endpoints that are not alive. A client connection already exists in this method. This filtering out of dead endpoints wouldn't be difficult, and would move tasks to the data making use of replica. This approach does need a new method in cassandra.thrift, eg list<string> describe_alive_nodes()
          Hide
          Jonathan Ellis added a comment -

          Sounds good, thanks!

          Show
          Jonathan Ellis added a comment - Sounds good, thanks!
          Hide
          Mck SembWever added a comment -

          the new "one-liner" CASSANDRA-2388 attached. i'll "submit patch" once i've tested it some...

          Show
          Mck SembWever added a comment - the new "one-liner" CASSANDRA-2388 attached. i'll "submit patch" once i've tested it some...
          Hide
          Mck SembWever added a comment - - edited

          The idea is to setup splits to have only endpoints that are valid trackers. But now i see this is just a brainfart Ofc the jobTracker will apply this match for us. And that CFIF was always 'restricted' to running on endpoints. Although the documentation on inputSplit.getLocations() is a little thin as to whether this restricts which trackers it should run on or whether is just a preference... I guess it doesn't matter, as you point out Jonathan all that's required here is the one line changed in CFRR.

          Show
          Mck SembWever added a comment - - edited The idea is to setup splits to have only endpoints that are valid trackers. But now i see this is just a brainfart Ofc the jobTracker will apply this match for us. And that CFIF was always 'restricted' to running on endpoints. Although the documentation on inputSplit.getLocations() is a little thin as to whether this restricts which trackers it should run on or whether is just a preference... I guess it doesn't matter, as you point out Jonathan all that's required here is the one line changed in CFRR.
          Hide
          Jonathan Ellis added a comment -

          +1 to CFRR changes

          wasn't immediately clear to me what CFIF changes are doing, can you elaborate?

          Show
          Jonathan Ellis added a comment - +1 to CFRR changes wasn't immediately clear to me what CFIF changes are doing, can you elaborate?
          Hide
          Mck SembWever added a comment -

          Is CASSANDRA-2388-local-nodes-only-rough-sketch the direction we want then?

          This is very initial code, i can't get new JobClient(JobTracker.getAddress(conf), conf).getClusterStatus().getActiveTrackerNames() to work, need a little help here.
          (Also CFRR.getLocations() can be drastically reduced).

          Show
          Mck SembWever added a comment - Is CASSANDRA-2388 -local-nodes-only-rough-sketch the direction we want then? This is very initial code, i can't get new JobClient(JobTracker.getAddress(conf), conf).getClusterStatus().getActiveTrackerNames() to work, need a little help here. (Also CFRR.getLocations() can be drastically reduced).
          Hide
          Jonathan Ellis added a comment -

          another where data can be read in from anywhere

          This is totally antithetical to how hadoop is designed to work. I don't think it's worth supporting in-tree.

          Show
          Jonathan Ellis added a comment - another where data can be read in from anywhere This is totally antithetical to how hadoop is designed to work. I don't think it's worth supporting in-tree.
          Hide
          Mck SembWever added a comment -

          Then i would hope for two separate InputFormats. One optimised for local node connection, where cassandra is deemed the more important system over hadoop, and another where data can be read in from anywhere. I think the latter should be supported in some manner since users may not always have the possibility to install hadoop and cassandra on the same servers, or they might not think it to be so critical part (eg if CFIF is reading using a IndexClause the input data set might be quite small and the remaining code in the m/r be the bulk of the processing...)

          Show
          Mck SembWever added a comment - Then i would hope for two separate InputFormats. One optimised for local node connection, where cassandra is deemed the more important system over hadoop, and another where data can be read in from anywhere. I think the latter should be supported in some manner since users may not always have the possibility to install hadoop and cassandra on the same servers, or they might not think it to be so critical part (eg if CFIF is reading using a IndexClause the input data set might be quite small and the remaining code in the m/r be the bulk of the processing...)
          Hide
          Jonathan Ellis added a comment -

          this means a dead c* node will bring down a TT

          Again: this is what you want to happen. As long as the C* process on the same node is down, you want the TT to be blacklisted and the jobs to go elsewhere.

          In a hadoop cluster with 3 nodes this means for 24hrs you're lost 33% throughput

          Right, but the real cause is because the C* process is dead, not b/c the TT is blacklisted. Making the TT read from other nodes will only hurt your network, not fix the throughput problem, b/c i/o is the bottleneck.

          Show
          Jonathan Ellis added a comment - this means a dead c* node will bring down a TT Again: this is what you want to happen . As long as the C* process on the same node is down, you want the TT to be blacklisted and the jobs to go elsewhere. In a hadoop cluster with 3 nodes this means for 24hrs you're lost 33% throughput Right, but the real cause is because the C* process is dead, not b/c the TT is blacklisted. Making the TT read from other nodes will only hurt your network, not fix the throughput problem, b/c i/o is the bottleneck.
          Hide
          Mck SembWever added a comment - - edited

          tlipcon says it comes back after 24h

          just to be clear about my concerns.
          this means a dead c* node will bring down a TT. In a hadoop cluster with 3 nodes this means for 24hrs you're lost 33% throughput. (If less than 10% of hadoop jobs used CFIF i could well imagine some pissed users). (What if you have a temporarily problem with flapping c* nodes and you end up with a handful of blacklisted TTs? etc etc etc).

          All this when using a replica, any replica, could have kept things going smoothly, the only slowdown being some of the data into CFIF had to go over the network instead...

          Show
          Mck SembWever added a comment - - edited tlipcon says it comes back after 24h just to be clear about my concerns. this means a dead c* node will bring down a TT. In a hadoop cluster with 3 nodes this means for 24hrs you're lost 33% throughput. (If less than 10% of hadoop jobs used CFIF i could well imagine some pissed users). (What if you have a temporarily problem with flapping c* nodes and you end up with a handful of blacklisted TTs? etc etc etc). All this when using a replica, any replica, could have kept things going smoothly, the only slowdown being some of the data into CFIF had to go over the network instead...
          Hide
          Jonathan Ellis added a comment -

          a blacklisted TT does't automatically come back

          tlipcon says it comes back after 24h, fwiw. In any case it's still the case that we DO want to blacklist it while it's down. (Brisk could perhaps add a "clear my tasktracker on restart" operation as a further enhancement.)

          I'm fine with dropping support for non-colocated TTs

          +1, it was a bad idea and I'm sorry I wrote it.

          Show
          Jonathan Ellis added a comment - a blacklisted TT does't automatically come back tlipcon says it comes back after 24h, fwiw. In any case it's still the case that we DO want to blacklist it while it's down. (Brisk could perhaps add a "clear my tasktracker on restart" operation as a further enhancement.) I'm fine with dropping support for non-colocated TTs +1, it was a bad idea and I'm sorry I wrote it.
          Hide
          Brandon Williams added a comment -

          This is making the presumption that the hadoop cluster is only used with CFIF.
          The TT could still be useful for other jobs submitted.

          I'm fine with that assumption. If you want to run other jobs, use a different cluster. Cassandra's JVM is eating wasteful memory at that point.

          Furthermore a blacklisted TT does't automatically come back - it needs to be manually restarted. Isn't this creating more headache for operations?

          I don't think this is actually the case, see HADOOP-4305

          I dont think we should require the TT to be running locally. The whole idea is to support access to Cassandra data from hadoop even if it's just an import.

          This patch does spend a lot of time dealing with non local data for that reason.

          I'm fine with dropping support for non-colocated TTs, or at least saying there's no DC-specific support. Because frankly, that is a very suboptimal thing to do, transfer the data across the network all the time, and flies in the face of Hadoop's core principles.

          Show
          Brandon Williams added a comment - This is making the presumption that the hadoop cluster is only used with CFIF. The TT could still be useful for other jobs submitted. I'm fine with that assumption. If you want to run other jobs, use a different cluster. Cassandra's JVM is eating wasteful memory at that point. Furthermore a blacklisted TT does't automatically come back - it needs to be manually restarted. Isn't this creating more headache for operations? I don't think this is actually the case, see HADOOP-4305 I dont think we should require the TT to be running locally. The whole idea is to support access to Cassandra data from hadoop even if it's just an import. This patch does spend a lot of time dealing with non local data for that reason. I'm fine with dropping support for non-colocated TTs, or at least saying there's no DC-specific support. Because frankly, that is a very suboptimal thing to do, transfer the data across the network all the time, and flies in the face of Hadoop's core principles.
          Hide
          T Jake Luciani added a comment -

          I dont think we should require the TT to be running locally. The whole idea is to support access to Cassandra data from hadoop even if it's just an import.

          This patch does spend a lot of time dealing with non local data for that reason.

          Show
          T Jake Luciani added a comment - I dont think we should require the TT to be running locally. The whole idea is to support access to Cassandra data from hadoop even if it's just an import. This patch does spend a lot of time dealing with non local data for that reason.
          Hide
          Mck SembWever added a comment -

          If the cassandra node where the TT resides isn't working, then throughput is reduced regardless.

          Right: we want it to be blacklisted in that scenario.

          This is making the presumption that the hadoop cluster is only used with CFIF.
          The TT could still be useful for other jobs submitted.
          Furthermore a blacklisted TT does't automatically come back - it needs to be manually restarted. Isn't this creating more headache for operations?

          Show
          Mck SembWever added a comment - If the cassandra node where the TT resides isn't working, then throughput is reduced regardless. Right: we want it to be blacklisted in that scenario. This is making the presumption that the hadoop cluster is only used with CFIF. The TT could still be useful for other jobs submitted. Furthermore a blacklisted TT does't automatically come back - it needs to be manually restarted. Isn't this creating more headache for operations?
          Hide
          Jonathan Ellis added a comment -

          This sounds like the thing we need to fix, then. Ensuring that the TT assigned to the map has a local replica.

          reverted 1139358, 1139483 to make a fresh start for this.

          how do we "ensure" this? isn't that the JT's job, to send jobs to the splits we gave it from CFIF? (which does make sure that only nodes with the data, are included in the split source list.)

          Show
          Jonathan Ellis added a comment - This sounds like the thing we need to fix, then. Ensuring that the TT assigned to the map has a local replica. reverted 1139358, 1139483 to make a fresh start for this. how do we "ensure" this? isn't that the JT's job, to send jobs to the splits we gave it from CFIF? (which does make sure that only nodes with the data, are included in the split source list.)
          Hide
          Jonathan Ellis added a comment -

          If the cassandra node where the TT resides isn't working, then throughput is reduced regardless.

          Right: we want it to be blacklisted in that scenario.

          Show
          Jonathan Ellis added a comment - If the cassandra node where the TT resides isn't working, then throughput is reduced regardless. Right: we want it to be blacklisted in that scenario.
          Hide
          Brandon Williams added a comment - - edited

          This does happen already (i've seen it while testing initial patches that were no good).
          Problem is that the TT is blacklisted, reducing hadoop's throughput for all jobs running.

          If the cassandra node where the TT resides isn't working, then throughput is reduced regardless.

          I bet too that a fallback to a replica is faster than a fallback to another TT.

          I doubt that for any significant job. Locality is important. Move the job to the data, not the data to the job.

          There is no guarantee that any given TT will have its split accessible via a local c* node - this is only a preference in CFRR. A failed task may just as likely go to a random c* node. At least now we can actually properly limit to the one DC and sort by proximity.

          This sounds like the thing we need to fix, then. Ensuring that the TT assigned to the map has a local replica.

          Show
          Brandon Williams added a comment - - edited This does happen already (i've seen it while testing initial patches that were no good). Problem is that the TT is blacklisted, reducing hadoop's throughput for all jobs running. If the cassandra node where the TT resides isn't working, then throughput is reduced regardless. I bet too that a fallback to a replica is faster than a fallback to another TT. I doubt that for any significant job. Locality is important. Move the job to the data, not the data to the job. There is no guarantee that any given TT will have its split accessible via a local c* node - this is only a preference in CFRR. A failed task may just as likely go to a random c* node. At least now we can actually properly limit to the one DC and sort by proximity. This sounds like the thing we need to fix, then. Ensuring that the TT assigned to the map has a local replica.
          Hide
          Mck SembWever added a comment - - edited
          • This does happen already (i've seen it while testing initial patches that were no good).
            Problem is that the TT is blacklisted, reducing hadoop's throughput for all jobs running.
            I bet too that a fallback to a replica is faster than a fallback to another TT.
          • There is no guarantee that any given TT will have its split accessible via a local c* node - this is only a preference in CFRR. A failed task may just as likely go to a random c* node. At least now we can actually properly limit to the one DC and sort by proximity.
          • One thing we're not doing here is applying this same DC limit and sort by proximity in the case when there isn't a localhost preference. See CFRR.initialize(..)
            It would make sense to rewrite CFRR.getLocations(..) to
                private Iterator<String> getLocations(final Configuration conf) throws IOException
                {
                    return new SplitEndpointIterator(conf);
                }

            and then to move the finding-a-preference-to-localhost code into SplitEndpointIterator...

          • A bug i can see in the patch that did get accepted already is in CassandraServer.java:763 when endpointValid is false and restrictToSameDC is true we end up restricting to a random DC. I could fix this so restrictToSameDC is disabled in such situations but this actually invalidates the previous point: we can't restrict to DC anymore and we can only sortByProximity to a random node... I think this supports Jonathan's point that it's overall a poor approach. I'm more and more in preference of my original approach using just client.getDatacenter(..) and not worrying about proximity within the datacenter.
          • Another bug is that, contray to my patch, the code committed

            committed with a change to use the dynamic snitch id the passed endpoint is valid.

            can call DynamicEndpointSnitch.sortByProximity(..) with an address that is not localhost and this breaks the assertion in the method.

          Show
          Mck SembWever added a comment - - edited This does happen already (i've seen it while testing initial patches that were no good). Problem is that the TT is blacklisted, reducing hadoop's throughput for all jobs running. I bet too that a fallback to a replica is faster than a fallback to another TT. There is no guarantee that any given TT will have its split accessible via a local c* node - this is only a preference in CFRR. A failed task may just as likely go to a random c* node. At least now we can actually properly limit to the one DC and sort by proximity. One thing we're not doing here is applying this same DC limit and sort by proximity in the case when there isn't a localhost preference. See CFRR.initialize(..) It would make sense to rewrite CFRR.getLocations(..) to private Iterator<String> getLocations(final Configuration conf) throws IOException { return new SplitEndpointIterator(conf); } and then to move the finding-a-preference-to-localhost code into SplitEndpointIterator... A bug i can see in the patch that did get accepted already is in CassandraServer.java:763 when endpointValid is false and restrictToSameDC is true we end up restricting to a random DC. I could fix this so restrictToSameDC is disabled in such situations but this actually invalidates the previous point: we can't restrict to DC anymore and we can only sortByProximity to a random node... I think this supports Jonathan's point that it's overall a poor approach. I'm more and more in preference of my original approach using just client.getDatacenter(..) and not worrying about proximity within the datacenter. Another bug is that, contray to my patch, the code committed committed with a change to use the dynamic snitch id the passed endpoint is valid. can call DynamicEndpointSnitch.sortByProximity(..) with an address that is not localhost and this breaks the assertion in the method.
          Hide
          Brandon Williams added a comment -

          If we ARE in that situation, the "right" solution would be to send the job to a TT whose local replica IS live, not to read the data from a nonlocal replica. How can we signal that?

          ISTM the right thing to do in that situation is just fail and let the JT reschedule somewhere else.

          Show
          Brandon Williams added a comment - If we ARE in that situation, the "right" solution would be to send the job to a TT whose local replica IS live, not to read the data from a nonlocal replica. How can we signal that? ISTM the right thing to do in that situation is just fail and let the JT reschedule somewhere else.
          Hide
          Mck SembWever added a comment - - edited

          CASSANDRA-2388-addition1.patch: Simplify CFRR now that multiple initialAddresses are supported.

          Show
          Mck SembWever added a comment - - edited CASSANDRA-2388 -addition1.patch: Simplify CFRR now that multiple initialAddresses are supported.
          Hide
          Jonathan Ellis added a comment -

          Taking a step back: aren't we optimizing for (1) a corner case with (2) the wrong solution?

          Here's what I mean:

          1) CFRR already prioritizes the local replica. So if you have >= one TT for each replica, this only helps if the local C* node dies, BUT the TT does not. This doesn't happen often.

          2) If we ARE in that situation, the "right" solution would be to send the job to a TT whose local replica IS live, not to read the data from a nonlocal replica. How can we signal that?

          Show
          Jonathan Ellis added a comment - Taking a step back: aren't we optimizing for (1) a corner case with (2) the wrong solution? Here's what I mean: 1) CFRR already prioritizes the local replica. So if you have >= one TT for each replica, this only helps if the local C* node dies, BUT the TT does not. This doesn't happen often. 2) If we ARE in that situation, the "right" solution would be to send the job to a TT whose local replica IS live, not to read the data from a nonlocal replica. How can we signal that?
          Hide
          Jonathan Ellis added a comment -

          CFRR's local node is the right and only node worth sorting against, it being the "task tracker node".

          Right.

          Then it is a random c* node that becomes the "local node"

          We still want to sort by proxmity-to-TT, because CFRR connects directly to the split owner to do the reads. initialAddress isn't involved post-split-discovery.

          Again, all the complexity goes away if we just embed the snitch into CFIF/TT.

          One wrinkle: ec2snitch requires gossip, so TT would need a separate local ip to participate in the gossip ring. We could make that optional (and fall back to old "recognize local data, otherwise you get a random replica" behavior otherwise).

          Show
          Jonathan Ellis added a comment - CFRR's local node is the right and only node worth sorting against, it being the "task tracker node". Right. Then it is a random c* node that becomes the "local node" We still want to sort by proxmity-to-TT, because CFRR connects directly to the split owner to do the reads. initialAddress isn't involved post-split-discovery. Again, all the complexity goes away if we just embed the snitch into CFIF/TT. One wrinkle: ec2snitch requires gossip, so TT would need a separate local ip to participate in the gossip ring. We could make that optional (and fall back to old "recognize local data, otherwise you get a random replica" behavior otherwise).
          Hide
          Mck SembWever added a comment - - edited

          It looks like there's a ton of effort put in to avoiding making sortByProximity work w/ non-local nodes

          Because it's only when that local node is down that we actually need to sort...
          When/if DynamicEndpointSnitch's limitation is fixed (and it can sort by non-local nodes) then CassandraServer.java need not bypass it. But this won't simplify the code in CFRR. Now that CFIF supports multiple initialAddresses the method CFRR.sortEndpointsByProximity(..) can be rewritten (ie any connection to any initialAddress is all we need, no need to mess around with trying to connect through replica's to find information about replicas...)

          Wait, why do we even care? "local node" IS the right host to sort against

          Depends on this is CFRR's "local node" or CassandraServer's "local node"...
          CFRR's local node is the right and only node worth sorting against, it being the "task tracker node".
          But when c* on the "task tracker node" is down, then we randomly connect to another c* node so to find out of the replica we know about which are 1) up, 2) closest, and 3) in the same dc. Then it is a random c* node that becomes the "local node" and the call needs to be snitch.sortByProximity(initialAddress, addresses).
          But yes... the CFRR code is contorted. In many ways i prefer the simplicity of the first patch (both in api and in implementation) despite it not being "as correct". i thought of this "fallback to replica" as a last resort to keep the m/r job running, rather than an actively used feature where DynamicEndpointSnitch's scores will maximise performance. But then i'm only thinking in terms of a small c* cluster and i certainly am naive about what performance gains these scores can give...

          Show
          Mck SembWever added a comment - - edited It looks like there's a ton of effort put in to avoiding making sortByProximity work w/ non-local nodes Because it's only when that local node is down that we actually need to sort... When/if DynamicEndpointSnitch's limitation is fixed (and it can sort by non-local nodes) then CassandraServer.java need not bypass it. But this won't simplify the code in CFRR. Now that CFIF supports multiple initialAddresses the method CFRR.sortEndpointsByProximity(..) can be rewritten (ie any connection to any initialAddress is all we need, no need to mess around with trying to connect through replica's to find information about replicas...) Wait, why do we even care? "local node" IS the right host to sort against Depends on this is CFRR's "local node" or CassandraServer's "local node"... CFRR's local node is the right and only node worth sorting against, it being the "task tracker node". But when c* on the "task tracker node" is down, then we randomly connect to another c* node so to find out of the replica we know about which are 1) up, 2) closest, and 3) in the same dc. Then it is a random c* node that becomes the "local node" and the call needs to be snitch.sortByProximity(initialAddress, addresses) . But yes... the CFRR code is contorted. In many ways i prefer the simplicity of the first patch (both in api and in implementation) despite it not being "as correct". i thought of this "fallback to replica" as a last resort to keep the m/r job running, rather than an actively used feature where DynamicEndpointSnitch's scores will maximise performance. But then i'm only thinking in terms of a small c* cluster and i certainly am naive about what performance gains these scores can give...
          Hide
          Jonathan Ellis added a comment -

          It looks like there's a ton of effort put in to avoiding making sortByProximity work w/ non-local nodes

          Wait, why do we even care? "local node" IS the right host to sort against – we want the split that is closest to the node running the job, this is not the same as some other C* node we contact.

          Show
          Jonathan Ellis added a comment - It looks like there's a ton of effort put in to avoiding making sortByProximity work w/ non-local nodes Wait, why do we even care? "local node" IS the right host to sort against – we want the split that is closest to the node running the job, this is not the same as some other C* node we contact.
          Hide
          Jonathan Ellis added a comment -

          I think there's deep surgery to be done here still though. Backporting is probably premature.

          Show
          Jonathan Ellis added a comment - I think there's deep surgery to be done here still though. Backporting is probably premature.
          Hide
          Jonathan Ellis added a comment -

          with svn: svn diff -r 1139323:1139483 and hack out the OutboundTcpConnection change in the middle manually from the output.

          with git: create a branch, rip out the offending OTC change and squash the other two

          Show
          Jonathan Ellis added a comment - with svn: svn diff -r 1139323:1139483 and hack out the OutboundTcpConnection change in the middle manually from the output. with git: create a branch, rip out the offending OTC change and squash the other two
          Hide
          Jeremy Hanna added a comment -

          Jonathan - is it possible to attach an updated patch based on your changes to 0.8 branch? Not sure if that would be simple to extract.

          Show
          Jeremy Hanna added a comment - Jonathan - is it possible to attach an updated patch based on your changes to 0.8 branch? Not sure if that would be simple to extract.
          Hide
          Jonathan Ellis added a comment -

          also: running hadoop on a non-cassandra node is dumb. i don't see a point in supporting that really. (yes, my fault it was written that way to begin with, mea culpa.)

          Show
          Jonathan Ellis added a comment - also: running hadoop on a non-cassandra node is dumb. i don't see a point in supporting that really. (yes, my fault it was written that way to begin with, mea culpa.)
          Hide
          Jonathan Ellis added a comment -

          Took a look at this belatedly. I don't understand the contortions at all. It looks like there's a ton of effort put in to avoiding making sortByProximity work w/ non-local nodes. Why not just make that work instead?

          Show
          Jonathan Ellis added a comment - Took a look at this belatedly. I don't understand the contortions at all. It looks like there's a ton of effort put in to avoiding making sortByProximity work w/ non-local nodes. Why not just make that work instead?
          Hide
          Jeremy Hanna added a comment -

          Reopening for testing against 0.7.6.

          Show
          Jeremy Hanna added a comment - Reopening for testing against 0.7.6.
          Hide
          Jeremy Hanna added a comment -

          I've done basic testing with the word count and pig examples to make sure that the basic hadoop integration isn't negatively affected by this. I'll also try it against our dev cluster before and after the patch - killing one node to see if it fails over to another replica - to make sure it does what it should that way.

          Show
          Jeremy Hanna added a comment - I've done basic testing with the word count and pig examples to make sure that the basic hadoop integration isn't negatively affected by this. I'll also try it against our dev cluster before and after the patch - killing one node to see if it fails over to another replica - to make sure it does what it should that way.
          Hide
          Jeremy Hanna added a comment - - edited

          This patch applies to the current 0.7-branch with minimal problems - just some imports on CassandraServer that it couldn't resolve properly. Can this be committed against 0.7-branch for inclusion in 0.7.7?

          Show
          Jeremy Hanna added a comment - - edited This patch applies to the current 0.7-branch with minimal problems - just some imports on CassandraServer that it couldn't resolve properly. Can this be committed against 0.7-branch for inclusion in 0.7.7?
          Hide
          Hudson added a comment -

          Integrated in Cassandra-0.8 #191 (See https://builds.apache.org/job/Cassandra-0.8/191/)
          Change ColumnFamilyRecordReader to read split from replicas if primary is down

          Patch by Mck SembWever; reviewed by tjake for CASSANDRA-2388

          jake : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1139358
          Files :

          • /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
          • /cassandra/branches/cassandra-0.8/interface/cassandra.thrift
          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java
          Show
          Hudson added a comment - Integrated in Cassandra-0.8 #191 (See https://builds.apache.org/job/Cassandra-0.8/191/ ) Change ColumnFamilyRecordReader to read split from replicas if primary is down Patch by Mck SembWever; reviewed by tjake for CASSANDRA-2388 jake : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1139358 Files : /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java /cassandra/branches/cassandra-0.8/interface/cassandra.thrift /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java
          Hide
          T Jake Luciani added a comment -

          committed with a change to use the dynamic snitch id the passed endpoint is valid.

          Show
          T Jake Luciani added a comment - committed with a change to use the dynamic snitch id the passed endpoint is valid.
          Hide
          Mck SembWever added a comment -

          Up to date patch.
          Follows T Jake's points (1),(2), and (4).
          And bypasses DynamicEndpointSnitch when sorting by proximity.

          Show
          Mck SembWever added a comment - Up to date patch. Follows T Jake's points (1),(2), and (4). And bypasses DynamicEndpointSnitch when sorting by proximity.
          Hide
          Mck SembWever added a comment - - edited

          Problem with the suggested approach is that sortByProximity(..) only works when address is the local address. See assert statement DynamicEndpointSnitch:134

          I could hack this and rewrite the line to

          IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
          snitch = snitch instanceof DynamicEndpointSnitch ? ((DynamicEndpointSnitch)snitch).subsnitch : snitch;
          snitch.sortByProximity(address, addresses);

          But this of course means that we always bypass DynamicEndpointSnitch's "scores".

          Show
          Mck SembWever added a comment - - edited Problem with the suggested approach is that sortByProximity(..) only works when address is the local address. See assert statement DynamicEndpointSnitch:134 I could hack this and rewrite the line to IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); snitch = snitch instanceof DynamicEndpointSnitch ? ((DynamicEndpointSnitch)snitch).subsnitch : snitch; snitch.sortByProximity(address, addresses); But this of course means that we always bypass DynamicEndpointSnitch's "scores".
          Hide
          Mck SembWever added a comment - - edited

          [snip] One possibility is to use the ip octets like the RackInferringSnitch.

          In our usecase we have three nodes defined via PropertyFileSnitch:

          152.90.241.22=DC1:RAC1 #node1
          152.90.241.23=DC2:RAC1 #node2
          152.90.241.24=DC1:RAC1 #node3

          The only way to infer here is even addresses belong to one dc, odd to the other. This is not how RackInferringSnithc works.

          When we make the connection through the "other" (node2) endpoint taking the rack inferring approach "152.90." will say it's in DC2. (again) this is the wrong DC and will return itself as a valid endpoint....

          Step (3) seems to me to be too specific to be included here.
          If i go only with steps (1),(2),and (4) we get this code:

              public String[] sort_endpoints_by_proximity(String endpoint, String[] endpoints, boolean restrictToSameDC) 
                      throws TException, InvalidRequestException
              {
                  try
                  {
                      List<String> results = new ArrayList<String>();
                      InetAddress address = InetAddress.getByName(endpoint);
                      boolean endpointValid = null != Gossiper.instance.getEndpointStateForEndpoint(address);
                      String datacenter = DatabaseDescriptor
                              .getEndpointSnitch().getDatacenter(endpointValid ? address : FBUtilities.getLocalAddress());
                      List<InetAddress> addresses = new ArrayList<InetAddress>();
                      for(String ep : endpoints)
                      {
                          addresses.add(InetAddress.getByName(endpoint));
                      }
                      DatabaseDescriptor.getEndpointSnitch().sortByProximity(address, addresses);
                      for(InetAddress ep : addresses)
                      {
                          String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(ep);
                          if(FailureDetector.instance.isAlive(ep) && (!restrictToSameDC || datacenter.equals(dc)))
                          {
                              results.add(ep.getHostName());
                          }
                      }
                      return results.toArray(new String[results.size()]);
                  }
                  catch (UnknownHostException e)
                  {
                      throw new InvalidRequestException(e.getMessage());
                  }
              }

          I'm happy with this (except that Gossiper.instance.getEndpointStateForEndpoint(address) is only my guess on how to tell if an endpoint is valid as such).

          Show
          Mck SembWever added a comment - - edited [snip] One possibility is to use the ip octets like the RackInferringSnitch. In our usecase we have three nodes defined via PropertyFileSnitch: 152.90.241.22=DC1:RAC1 #node1 152.90.241.23=DC2:RAC1 #node2 152.90.241.24=DC1:RAC1 #node3 The only way to infer here is even addresses belong to one dc, odd to the other. This is not how RackInferringSnithc works. When we make the connection through the "other" (node2) endpoint taking the rack inferring approach "152.90." will say it's in DC2. (again) this is the wrong DC and will return itself as a valid endpoint.... Step (3) seems to me to be too specific to be included here. If i go only with steps (1),(2),and (4) we get this code: public String[] sort_endpoints_by_proximity(String endpoint, String[] endpoints, boolean restrictToSameDC) throws TException, InvalidRequestException { try { List<String> results = new ArrayList<String>(); InetAddress address = InetAddress.getByName(endpoint); boolean endpointValid = null != Gossiper.instance.getEndpointStateForEndpoint(address); String datacenter = DatabaseDescriptor .getEndpointSnitch().getDatacenter(endpointValid ? address : FBUtilities.getLocalAddress()); List<InetAddress> addresses = new ArrayList<InetAddress>(); for(String ep : endpoints) { addresses.add(InetAddress.getByName(endpoint)); } DatabaseDescriptor.getEndpointSnitch().sortByProximity(address, addresses); for(InetAddress ep : addresses) { String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(ep); if(FailureDetector.instance.isAlive(ep) && (!restrictToSameDC || datacenter.equals(dc))) { results.add(ep.getHostName()); } } return results.toArray(new String[results.size()]); } catch (UnknownHostException e) { throw new InvalidRequestException(e.getMessage()); } } I'm happy with this (except that Gossiper.instance.getEndpointStateForEndpoint(address) is only my guess on how to tell if an endpoint is valid as such).
          Hide
          T Jake Luciani added a comment - - edited

          I think the core issue is you can't assume the hadoop node is running on a cassandra node...

          If it is then the logic is straight forward, if not then it's possible the connection could cross DC boundaries. One possibility is to use the ip octets like the RackInferringSnitch.

          How's this proposal then? keep the sort_endpoints_by_proximity signature as is and pass the client endpoint along with the list of data endpoints and add the following logic:

          1) sort the endpoints using the endpoint_snitch.
          2) if client endpoint is a valid cassandra node get the nodes DC and prune nodes outside of this DC
          3) if client endpoint is not a valid cassandra node try to infer the DC from its ip and prune dataendpoint nodes in a different DC. If no cassandra nodes are in the DC list goto 3).
          4) all else fails return the sorted endpoint list

          Show
          T Jake Luciani added a comment - - edited I think the core issue is you can't assume the hadoop node is running on a cassandra node... If it is then the logic is straight forward, if not then it's possible the connection could cross DC boundaries. One possibility is to use the ip octets like the RackInferringSnitch. How's this proposal then? keep the sort_endpoints_by_proximity signature as is and pass the client endpoint along with the list of data endpoints and add the following logic: 1) sort the endpoints using the endpoint_snitch. 2) if client endpoint is a valid cassandra node get the nodes DC and prune nodes outside of this DC 3) if client endpoint is not a valid cassandra node try to infer the DC from its ip and prune dataendpoint nodes in a different DC. If no cassandra nodes are in the DC list goto 3). 4) all else fails return the sorted endpoint list
          Hide
          Mck SembWever added a comment - - edited

          Won't the sorting still be wrong?
          For the use-case above it will solve restricting to the correct datacenter, but the sorting will still be based on proximity to the wrong node?

          I don't think it makes sense to send the client endpoint to this call since the endpoint might not be a cassandra node.

          It might not be an alive cassandra node, but it should be a cassandra node. It comes from the split's list of endpoints. At least in this use-case, or are you referring to general usage for this new api?

          It's a reasonable assumption that the endpoint it's talking to is local enough to the client to use that.

          I don't think so... The endpoint that it talks to is a completely random (just the next endpoint listed in the split's list). This is why i think that such sorting won't just be wrong but not even close. Does this make sense?

          Show
          Mck SembWever added a comment - - edited Won't the sorting still be wrong? For the use-case above it will solve restricting to the correct datacenter, but the sorting will still be based on proximity to the wrong node? I don't think it makes sense to send the client endpoint to this call since the endpoint might not be a cassandra node. It might not be an alive cassandra node, but it should be a cassandra node. It comes from the split's list of endpoints. At least in this use-case, or are you referring to general usage for this new api? It's a reasonable assumption that the endpoint it's talking to is local enough to the client to use that. I don't think so... The endpoint that it talks to is a completely random (just the next endpoint listed in the split's list). This is why i think that such sorting won't just be wrong but not even close. Does this make sense?
          Hide
          T Jake Luciani added a comment -

          ok but why not change the response to map<string,list<string>> where key is DC and value are proximity sorted endpoints?

          Show
          T Jake Luciani added a comment - ok but why not change the response to map<string,list<string>> where key is DC and value are proximity sorted endpoints?
          Hide
          Mck SembWever added a comment - - edited

          public String[] sort_endpoints_by_proximity(String endpoint, String[] endpoints, boolean restrictToSameDC)

          I don't think it makes sense to send the client endpoint to this call since the endpoint might not be a cassandra node. It's a reasonable assumption that the endpoint it's talking to is local enough to the client to use that.

          For the test set i was running against, RF=2, each split's has two endpoints always in different datacenters.

          If the "local" endpoint is down then getLocations() will then call client.sort_endpoints_by_proximity(..) and this will fail (being the same endpoint).
          It then makes a client connection through the "other" endpoint. [see CFRR.describeDatacenter(..)].
          This will presume the wrong datacenter and return itself as a valid endpoint.
          I need some way to know what the original datacenter is, even when it is down.

          Show
          Mck SembWever added a comment - - edited public String[] sort_endpoints_by_proximity(String endpoint, String[] endpoints, boolean restrictToSameDC) I don't think it makes sense to send the client endpoint to this call since the endpoint might not be a cassandra node. It's a reasonable assumption that the endpoint it's talking to is local enough to the client to use that. For the test set i was running against, RF=2, each split's has two endpoints always in different datacenters. If the "local" endpoint is down then getLocations() will then call client.sort_endpoints_by_proximity(..) and this will fail (being the same endpoint). It then makes a client connection through the "other" endpoint. [see CFRR.describeDatacenter(..)]. This will presume the wrong datacenter and return itself as a valid endpoint. I need some way to know what the original datacenter is, even when it is down.
          Hide
          T Jake Luciani added a comment -

          what is the lost performance of writing to the furthest node that's within the same datacenter?

          The benefit is really the DynamicSnitch. if a node it slow due to compaction then this would avoid sending requests there...

          public String[] sort_endpoints_by_proximity(String endpoint, String[] endpoints, boolean restrictToSameDC)

          I don't think it makes sense to send the client endpoint to this call since the endpoint might not be a cassandra node. It's a reasonable assumption that the endpoint it's talking to is local enough to the client to use that.

          Show
          T Jake Luciani added a comment - what is the lost performance of writing to the furthest node that's within the same datacenter? The benefit is really the DynamicSnitch. if a node it slow due to compaction then this would avoid sending requests there... public String[] sort_endpoints_by_proximity(String endpoint, String[] endpoints, boolean restrictToSameDC) I don't think it makes sense to send the client endpoint to this call since the endpoint might not be a cassandra node. It's a reasonable assumption that the endpoint it's talking to is local enough to the client to use that.
          Hide
          Mck SembWever added a comment - - edited

          Just make sure i understand you T Jake, you would rather something like this in CassandraServer.java?
          (I've renamed from the previous comment get_endpoints_in_same_datacenter(..) to sort_endpoints_by_proximity(..))

              public String[] sort_endpoints_by_proximity(String endpoint, String[] endpoints, boolean restrictToSameDC) 
                  throws TException, InvalidRequestException
              {
                  try
                  {
                      List<String> results = new ArrayList<String>();
                      InetAddress address = InetAddress.getByName(endpoint);
                      String datacenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(address);
                      List<InetAddress> addresses = new ArrayList<InetAddress>();
                      for(String ep : endpoints)
                      {
                          addresses.add(InetAddress.getByName(ep));
                      }
                      DatabaseDescriptor.getEndpointSnitch().sortByProximity(address, addresses);
                      for(InetAddress ep : addresses)
                      {
                          String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(ep);
                          if(FailureDetector.instance.isAlive(ep) && (!restrictToSameDC || datacenter.equals(dc)))
                          {
                              results.add(ep.getHostName());
                          }
                      }
                      return results.toArray(new String[results.size()]);
                  }
                  catch (UnknownHostException e)
                  {
                      throw new InvalidRequestException(e.getMessage());
                  }
              }
          
          Show
          Mck SembWever added a comment - - edited Just make sure i understand you T Jake, you would rather something like this in CassandraServer.java? (I've renamed from the previous comment get_endpoints_in_same_datacenter(..) to sort_endpoints_by_proximity(..)) public String[] sort_endpoints_by_proximity(String endpoint, String[] endpoints, boolean restrictToSameDC) throws TException, InvalidRequestException { try { List<String> results = new ArrayList<String>(); InetAddress address = InetAddress.getByName(endpoint); String datacenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(address); List<InetAddress> addresses = new ArrayList<InetAddress>(); for(String ep : endpoints) { addresses.add(InetAddress.getByName(ep)); } DatabaseDescriptor.getEndpointSnitch().sortByProximity(address, addresses); for(InetAddress ep : addresses) { String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(ep); if(FailureDetector.instance.isAlive(ep) && (!restrictToSameDC || datacenter.equals(dc))) { results.add(ep.getHostName()); } } return results.toArray(new String[results.size()]); } catch (UnknownHostException e) { throw new InvalidRequestException(e.getMessage()); } }
          Hide
          Mck SembWever added a comment - - edited

          Then (if i understand you correctly) i would need in cassandra.thrift

                /** returns alive endpoints, sorted by proximity, that belong in the same datacenter as the given endpoint */
            list<string> get_endpoints_in_same_datacenter(1: string endpoint, 2: required list<string> endpoints)
              throws (1:InvalidRequestException ire)
          

          Then the API becomes quite specific to this usecase. Is the performance gain worth it? What's the cost of each client.describe_datacenter(..) call, and probably more important what is the lost performance of writing to the furthest node that's within the same datacenter?

          Show
          Mck SembWever added a comment - - edited Then (if i understand you correctly) i would need in cassandra.thrift /** returns alive endpoints, sorted by proximity, that belong in the same datacenter as the given endpoint */ list<string> get_endpoints_in_same_datacenter(1: string endpoint, 2: required list<string> endpoints) throws (1:InvalidRequestException ire) Then the API becomes quite specific to this usecase. Is the performance gain worth it? What's the cost of each client.describe_datacenter(..) call, and probably more important what is the lost performance of writing to the furthest node that's within the same datacenter?
          Hide
          T Jake Luciani added a comment -

          The get_rack seems unused so it should be removed.

          Also, it might be better to pass all locations in the get_datacenter thrift call since you can get the results in one shot and sort them by the dynamic snitch, filtering out the dead nodes:

            DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints); 
          
            FailureDetector.instance.isAlive(endpoint)
          
          Show
          T Jake Luciani added a comment - The get_rack seems unused so it should be removed. Also, it might be better to pass all locations in the get_datacenter thrift call since you can get the results in one shot and sort them by the dynamic snitch, filtering out the dead nodes: DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints); FailureDetector.instance.isAlive(endpoint)
          Hide
          Mck SembWever added a comment -

          New patch. I think i'm at last happy with it.

          getLocations() returns an iterator so client.describe_datacenter() is only called when necessary.

          Rather than provide a list in initialAddress it was possible to use either the initialAddress OR the endpoint. This gave the benefit in not listing a location that can't actually be connected to.

          The "only use replica from same DC" is an option now in ConfigHelper. By default it is true.

          Again the re-generated Cassandra.java is not included in the patch.

          I have tested this on normal jobs, and RF=2 jobs with a node down.

          Show
          Mck SembWever added a comment - New patch. I think i'm at last happy with it. getLocations() returns an iterator so client.describe_datacenter() is only called when necessary. Rather than provide a list in initialAddress it was possible to use either the initialAddress OR the endpoint. This gave the benefit in not listing a location that can't actually be connected to. The "only use replica from same DC" is an option now in ConfigHelper. By default it is true. Again the re-generated Cassandra.java is not included in the patch. I have tested this on normal jobs, and RF=2 jobs with a node down.
          Hide
          Mck SembWever added a comment -

          I have tested this now on data w/ RF=2.
          Seems to work ~ok as far as i can see.

          One side-effect of this patch is where once one could configure ConfigHelper.setInitialAddress(conf, "localhost") this will no longer work for tasks trying to run on the down node.
          ColumnFamilyRecordReader.getLocations() will ConnectException trying to call describe_datacenter(..). This will lead to the task failing. Hadoop re-runs the task then on another node and eventually the job will complete. But the fall back to replica never is used.

          If the initialAddress is hardcoded to one node then we no longer have a decentralised job.

          I would like to allow a comma-separated in initialAddress, for example it could be "localhost, node01, node02, node03". This would give preference to localhost and avoid any centralisation.

          I would also like to make ColumnFamilyRecordReader.getLocations() return an iterator instead of an array.
          The createConnection(..) and client.describe_datacenter(..) calls are an unnecessary overhead when all nodes (or first endpoint location) are up, and could be avoided by lazy-loading the list.

          Show
          Mck SembWever added a comment - I have tested this now on data w/ RF=2. Seems to work ~ok as far as i can see. One side-effect of this patch is where once one could configure ConfigHelper.setInitialAddress(conf, "localhost") this will no longer work for tasks trying to run on the down node. ColumnFamilyRecordReader.getLocations() will ConnectException trying to call describe_datacenter(..). This will lead to the task failing. Hadoop re-runs the task then on another node and eventually the job will complete. But the fall back to replica never is used. If the initialAddress is hardcoded to one node then we no longer have a decentralised job. I would like to allow a comma-separated in initialAddress, for example it could be "localhost, node01, node02, node03". This would give preference to localhost and avoid any centralisation. I would also like to make ColumnFamilyRecordReader.getLocations() return an iterator instead of an array. The createConnection(..) and client.describe_datacenter(..) calls are an unnecessary overhead when all nodes (or first endpoint location) are up, and could be avoided by lazy-loading the list.
          Hide
          Mck SembWever added a comment -

          Second attempt. (god only knows what i was trying to test last patch
          this patch:

          • adds describe_datacenter and describe_rack to cassandra.thrift
          • adds locations in ColumnFamilyRecordReader from the split's alternative endpoints if dc is the same

          This patch does not include the required re-generated Cassandra.java

          Show
          Mck SembWever added a comment - Second attempt. (god only knows what i was trying to test last patch this patch: adds describe_datacenter and describe_rack to cassandra.thrift adds locations in ColumnFamilyRecordReader from the split's alternative endpoints if dc is the same This patch does not include the required re-generated Cassandra.java
          Hide
          Mck SembWever added a comment - - edited

          Initial attempt at solution. Although I'm a little apprehensive to the additions to cassandra.thrift
          (describe_rack(..) isn't used anywhere, it just made sense to add describe_datacenter(..) and describe_rack(..) at the same time).

          I've tested that existing hadoop jobs work but the new functionality hasn't been tested (as i currently don't have any RF=2 data setup).

          This patch does not include the required re-generated Cassandra.java

          Show
          Mck SembWever added a comment - - edited Initial attempt at solution. Although I'm a little apprehensive to the additions to cassandra.thrift (describe_rack(..) isn't used anywhere, it just made sense to add describe_datacenter(..) and describe_rack(..) at the same time). I've tested that existing hadoop jobs work but the new functionality hasn't been tested (as i currently don't have any RF=2 data setup). This patch does not include the required re-generated Cassandra.java
          Hide
          Mck SembWever added a comment -

          How do i obtain the DataCenter name for a given address?

          IEndpointSnitch.getDataCenter(inetAddress) would work nicely for me but how do i get the snitch client-side?

          Show
          Mck SembWever added a comment - How do i obtain the DataCenter name for a given address? IEndpointSnitch.getDataCenter(inetAddress) would work nicely for me but how do i get the snitch client-side?
          Hide
          Mck SembWever added a comment -

          I'm having a go currently at CASSANDRA-1125 so i might as well look at this too. (but you've caught me on a holiday-week...)

          Show
          Mck SembWever added a comment - I'm having a go currently at CASSANDRA-1125 so i might as well look at this too. (but you've caught me on a holiday-week...)
          Hide
          Jonathan Ellis added a comment -

          Mck, do you want to take a stab at this?

          Show
          Jonathan Ellis added a comment - Mck, do you want to take a stab at this?
          Hide
          T Jake Luciani added a comment - - edited

          We need to return the list of replicas in the same DC

          Show
          T Jake Luciani added a comment - - edited We need to return the list of replicas in the same DC
          Hide
          Jonathan Ellis added a comment -

          Eldon, are you planning to take another stab at this?

          Show
          Jonathan Ellis added a comment - Eldon, are you planning to take another stab at this?
          Hide
          Brandon Williams added a comment -

          Unfortunately, I thought of another problem here. If we go over the entire replica set, we're potentially going outside of the DC, which is bad since a lot of installations have a DC dedicated to analytics so it doesn't affect their app. It seems that the local address is preferred though, are your task trackers not on the same machines as Cassandra?

          Show
          Brandon Williams added a comment - Unfortunately, I thought of another problem here. If we go over the entire replica set, we're potentially going outside of the DC, which is bad since a lot of installations have a DC dedicated to analytics so it doesn't affect their app. It seems that the local address is preferred though, are your task trackers not on the same machines as Cassandra?
          Hide
          Brandon Williams added a comment -

          I'm not sure special casing NoRouteToHostException to be blacklisted is the best thing to do. I don't think connections are being setup so often that maintaining a blacklist for any reason is needed.

          Show
          Brandon Williams added a comment - I'm not sure special casing NoRouteToHostException to be blacklisted is the best thing to do. I don't think connections are being setup so often that maintaining a blacklist for any reason is needed.

            People

            • Assignee:
              Mck SembWever
              Reporter:
              Eldon Stegall
              Reviewer:
              Jonathan Ellis
            • Votes:
              8 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:

                Development