Bug Category:Correctness - API / Semantic Implementation
Discovered By:User Report
Source Control Link:
In 3.0 CqlInputFormat switched away from thrift in favor of a new system.size_estimates table, but the semantics changed when dealing with multiple DCs or when Cassandra is not collocated with Hadoop.
The core issues are:
- system.size_estimates uses the primary range, in a multi-dc setup this could lead to uneven ranges
Using NetworkTopologyStrategy the primary ranges are: [0, 1), [1, 2), [2, 10), [10, 11), [11, 12), [12, 20), [20, 21), [21, 22), [22, 30), [30, 31), [31, 32), [32, 0).
Given this the only ranges that are more than one token are: [2, 10), [12, 20), [22, 30).
- system.size_estimates is not replicated so need to hit every node in the cluster to get estimates, if nodes are down in the DC with non-size-1 ranges there is no way to get a estimate.
- CqlInputFormat used to call describe_local_ring so all interactions were with a single DC, the java driver doesn't filter the DC so looks to allow cross DC traffic and includes nodes from other DCs in the replica set; in the example above, the amount of splits went from 4 to 12.
- CqlInputFormat used to call describe_splits_ex to dynamically calculate the estimates, this was on the "local primary range" and was able to hit replicas to create estimates if the primary was down. With system.size_estimates we no longer have backup and no longer expose the "local primary range" in multi-dc.
- CqlInputFormat had a config cassandra.input.keyRange which let you define your own range. If the range doesn't perfectly match the local range then the intersectWith calls will produce ranges with no estimates. Example: [0, 10, 20], cassandra.input.keyRange=5,15. This won't find any estimates so will produce 2 splits with 128 estimate (default when not found).
- CqlInputFormat special cases Cassandra being collocated with Hadoop and assumes this when querying system.size_estimates as it doesn't filter to the specific host, this means that non-collocated deployments randomly select the nodes and create splits with ranges the hosts do not have locally.
The problems are deterministic to replicate, the following test will show it
1) deploy a 3 DC cluster with 3 nodes each
2) create DC2 tokens are +1 of DC1 and DC3 are +1 of DC2
3) CREATE KEYSPACE simpleuniform0 WITH replication =
4) CREATE TABLE simpletable0 (pk bigint, ck bigint, value blob, PRIMARY KEY (pk, ck))
5) insert 500k partitions uniformly: [0, 500,000)
6) wait until estimates catch up to writes
7) for all nodes, SELECT * FROM system.size_estimates
You will get the following
8) create a MR job against simpleuniform0. simpletable0, you will get 10 splits where as 2.1 was 4