Details
Description
We are running KMeans on approximately 350M rows of x, y, z coordinates using the following configuration:
KMeans( featuresCol='features', predictionCol='centroid_id', k=50000, initMode='k-means||', initSteps=2, tol=0.00005, maxIter=20, seed=SEED, distanceMeasure='euclidean' )
When using Spark 3.0.0 this worked fine, but when upgrading to 3.1.1 we are consistently getting errors unless we reduce K.
Stacktrace:
An error occurred while calling o167.fit.An error occurred while calling o167.fit.: java.lang.NegativeArraySizeException: -897458648 at scala.reflect.ManifestFactory$DoubleManifest.newArray(Manifest.scala:194) at scala.reflect.ManifestFactory$DoubleManifest.newArray(Manifest.scala:191) at scala.Array$.ofDim(Array.scala:221) at org.apache.spark.mllib.clustering.DistanceMeasure.computeStatistics(DistanceMeasure.scala:52) at org.apache.spark.mllib.clustering.KMeans.runAlgorithmWithWeight(KMeans.scala:280) at org.apache.spark.mllib.clustering.KMeans.runWithWeight(KMeans.scala:231) at org.apache.spark.ml.clustering.KMeans.$anonfun$fit$1(KMeans.scala:354) at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191) at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:329) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Unknown Source)
The issue is introduced by #27758] which significantly reduces the maximum value of K. Snippit of line that throws error from DistanceMeasure.scala:]
val packedValues = Array.ofDim[Double](k * (k + 1) / 2)
What we have tried:
- Reducing iterations
- Reducing input volume
- Reducing K
Only reducing K have yielded success.
Possible workaround:
- Roll back to Spark 3.0.0 since a KMeansModel generated with 3.0.0 cannot be loaded in 3.1.1.
- Reduce K. Currently trying with 45000.
What we don't understand:
Given the line of code above, we do not understand why we would get an integer overflow.
For K=50,000, packedValues should be allocated with the size of 1,250,025,000 < (2^31) and not result in a negative array size.
Suggested resolution:
I'm not strong in the inner workings on KMeans, but my immediate thought would be to add a fallback to previous logic for K larger than a set threshold if the optimisation is to stay in place, as it breaks compatibility from 3.0.0 to 3.1.1 for edge cases.
Please let me know if more information is needed, this is my first time raising a bug for a OS.
Attachments
Issue Links
- is related to
-
SPARK-31007 KMeans optimization based on triangle-inequality
- Resolved
- links to