Details
Description
The problem is caused by having 2300+ tokens due to vnodes.
In the client side I get this exception
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:691) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:943) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1336) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:132) at org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187) at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1054) at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1071) at org.apache.hadoop.mapred.JobClient.access$700(JobClient.java:179) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:983) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:936) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190) at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:936) at org.apache.hadoop.mapreduce.Job.submit(Job.java:550) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:580) at com.relateiq.hadoop.cassandra.etl.CassandraETLJob.run(CassandraETLJob.java:58) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) at com.relateiq.hadoop.cassandra.etl.CassandraETLJob.main(CassandraETLJob.java:149)
The problem seem to be in AbstractColumnFamilyInputFormat line ~180 which has an unbounded upper limit (actually it is Integer.MAX_INT)
ExecutorService executor = Executors.newCachedThreadPool();
Followed by:
for (TokenRange range : masterRangeNodes) { if (jobRange == null) { // for each range, pick a live owner and ask it to compute bite-sized splits splitfutures.add(executor.submit(new SplitCallable(range, conf))); } else .....
which gets called one time per token and creates one thread just as many times.
The easy fix unless there is a longer term fix I'm unaware of would be to set an upper limit to the thread pool.
Something like this:
ExecutorService executor = new ThreadPoolExecutor(0, ConfigHelper.getMaxConcurrentSplitsResolution(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
Shall I proceed with a patch ?