Trying to run a map/reduce job with BulkOutputFormat, in an environment where the Hadoop nodes have Dual-stack (IPv4+IPv6) and the Cassandra servers are IPv6-only, it seems like the TCP connection setup for streaming is explicitly setting the source address to the IPv4 address of the Hadoop node, even though the destination address is IPv6.
I'm seeing connection attempts where source address is an IPv4-represented-in-IPv6 address and destination is IPv6 of cassandra node.
In the log output from the Hadoop M/R job, I see:
So, digging a bit down the code, I see that org.apache.cassandra.hadoop.BulkRecordWriter successfully creates a Thrift connection to my Cassandra cluster, over IPv6. It successfully retrieves tokenrange information.
Later on, in org.apache.cassandra.streaming.FileStreamTask, it fails to connect to the destination cassandra node. It seems to me that the problem is that org.apache.cassandra.net.OutboundTcpConnectionPool is asking FBUtilities.getLocalAddress for the address to bind to, and getLocalAddress is returning an IPv4 address when DatabaseDescriptor has not been initialized. And DatabaseDescriptor has not been initialized, becase in BulkOutputFormat we're not reading cassandra.yaml.
I actually have a workaround for this which involves not applying patch that removes need to read cassandra.yaml, then point to a cassandra.yaml generated specifically for the purpose on each hadoop node, with listen_address set to the IPv6 address of the node.
This is with net.ipv6.bindv6only=0 in Linux sysctl - something you must have for Hadoop to run.
Also tried -D mapred.child.java.opts="-Djava.net.preferIPv4Stack=false -Djava.net.preferIPv6Addresses=true", i.e. setting properties to prefer IPv6 stack to M/R job, but didn't help.
In this case, we would probably be better of not explicitly binding to any address - the OS would do that for us. I understand binding explicitly makes sense when this code is running inside Cassandra server.