Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18531

Apache Spark FPGrowth algorithm implementation fails with java.lang.StackOverflowError

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 1.6.1
    • 1.6.3
    • MLlib
    • None

    Description

      More details can be found here: https://gist.github.com/tuxdna/37a69b53e6f9a9442fa3b1d5e53c2acb

      Spark FPGrowth algorithm croaks with a small dataset as shown below

      $ spark-shell --master "local[*]" --driver-memory 5g
      Welcome to
      ____ __
      / _/_ ___ ____/ /_
      \ \/ _ \/ _ `/ __/ '/
      /__/ ./_,// //_\ version 1.6.1
      /_/

      Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_102)
      Spark context available as sc.
      SQL context available as sqlContext.

      scala> import org.apache.spark.mllib.fpm.FPGrowth
      import org.apache.spark.mllib.fpm.FPGrowth

      scala> import org.apache.spark.rdd.RDD
      import org.apache.spark.rdd.RDD

      scala> import org.apache.spark.sql.SQLContext
      import org.apache.spark.sql.SQLContext

      scala> import org.apache.spark.

      {SparkConf, SparkContext}
      import org.apache.spark.{SparkConf, SparkContext}

      scala> val data = sc.textFile("bug.data")
      data: org.apache.spark.rdd.RDD[String] = bug.data MapPartitionsRDD[1] at textFile at <console>:31

      scala> val transactions: RDD[Array[String]] = data.map(l => l.split(",").distinct)
      transactions: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:33

      scala> transactions.cache()
      res0: transactions.type = MapPartitionsRDD[2] at map at <console>:33

      scala> val fpg = new FPGrowth().setMinSupport(0.05).setNumPartitions(10)
      fpg: org.apache.spark.mllib.fpm.FPGrowth = org.apache.spark.mllib.fpm.FPGrowth@66d62c59

      scala> val model = fpg.run(transactions)
      model: org.apache.spark.mllib.fpm.FPGrowthModel[String] = org.apache.spark.mllib.fpm.FPGrowthModel@6e92f150

      scala> model.freqItemsets.take(1).foreach

      { i => i.items.mkString("[", ",", "]") + ", " + i.freq }

      [Stage 3:> (0 + 2) / 2]16/11/21 23:56:14 ERROR Executor: Managed memory leak detected; size = 18068980 bytes, TID = 14
      16/11/21 23:56:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 14)
      java.lang.StackOverflowError
      at org.xerial.snappy.Snappy.arrayCopy(Snappy.java:84)
      at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:273)
      at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:115)
      at org.apache.spark.io.SnappyOutputStreamWrapper.write(CompressionCodec.scala:202)
      at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
      at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

      This failure is likely due to the size of baskets which contains over thousands of items.

      scala> val maxBasketSize = transactions.map(_.length).max()
      maxBasketSize: Int = 1171

      scala> transactions.filter(_.length == maxBasketSize).collect()
      res3: Array[Array[String]] = Array(Array(3858, 109, 5842, 2184, 2481, 534

      Attachments

        Activity

          People

            Unassigned Unassigned
            tuxdna Saleem Ansari
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: