Description
Running StreamingKMeans Clustering with REDUCE_STREAMING_KMEANS = true and when no estimatedDistanceCutoff is specified, throws the following error
java.lang.IllegalArgumentException: Must have nonzero number of training and test vectors. Asked for %.1f %% of %d vectors for test [10.000000149011612, 0] at com.google.common.base.Preconditions.checkArgument(Preconditions.java:120) at org.apache.mahout.clustering.streaming.cluster.BallKMeans.splitTrainTest(BallKMeans.java:176) at org.apache.mahout.clustering.streaming.cluster.BallKMeans.cluster(BallKMeans.java:192) at org.apache.mahout.clustering.streaming.mapreduce.StreamingKMeansReducer.getBestCentroids(StreamingKMeansReducer.java:107) at org.apache.mahout.clustering.streaming.mapreduce.StreamingKMeansReducer.reduce(StreamingKMeansReducer.java:73) at org.apache.mahout.clustering.streaming.mapreduce.StreamingKMeansReducer.reduce(StreamingKMeansReducer.java:37) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)
The issue is caused by the following code in StreamingKMeansThread.call()
Iterator<Centroid> datapointsIterator = datapoints.iterator(); if (estimateDistanceCutoff == StreamingKMeansDriver.INVALID_DISTANCE_CUTOFF) { List<Centroid> estimatePoints = Lists.newArrayListWithExpectedSize(NUM_ESTIMATE_POINTS); while (datapointsIterator.hasNext() && estimatePoints.size() < NUM_ESTIMATE_POINTS) { estimatePoints.add(datapointsIterator.next()); } estimateDistanceCutoff = ClusteringUtils.estimateDistanceCutoff(estimatePoints, searcher.getDistanceMeasure()); } StreamingKMeans clusterer = new StreamingKMeans(searcher, numClusters, estimateDistanceCutoff); while (datapointsIterator.hasNext()) { clusterer.cluster(datapointsIterator.next()); }
The code is using the same iterator twice, and it fails on the second use for obvious reasons.