Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32779

Spark/Hive3 interaction potentially causes deadlock

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0, 3.1.0
    • 3.0.2, 3.1.0
    • SQL
    • None

    Description

      This is an issue for applications that share a Spark Session across multiple threads.

      sessionCatalog.loadPartition (after checking that the table exists) grabs locks in this order:

      • HiveExternalCatalog
      • HiveSessionCatalog (in Shim_v3_0)

      Other operations (e.g., sessionCatalog.tableExists), grab locks in this order:

      • HiveSessionCatalog
      • HiveExternalCatalog

      This appears to be the culprit. Maybe db name should be defaulted before the call to HiveClient so that Shim_v3_0 doesn't have to call back into SessionCatalog. Or possibly this is not needed at all, since loadPartition in Shim_v2_1 doesn't worry about the default db name, but that might be because of differences between Hive client libraries.

      Reproduction case:

      • You need to have a running Hive 3.x HMS instance and the appropriate hive-site.xml for your Spark instance
      • Adjust your spark.sql.hive.metastore.version accordingly
      • It might take more than one try to hit the deadlock

      Launch Spark:

      bin/spark-shell --conf "spark.sql.hive.metastore.jars=${HIVE_HOME}/lib/*" --conf spark.sql.hive.metastore.version=3.1
      

      Then use the following code:

      import scala.collection.mutable.ArrayBuffer
      import scala.util.Random
      
      val tableCount = 4
      for (i <- 0 until tableCount) {
        val tableName = s"partitioned${i+1}"
        sql(s"drop table if exists $tableName")
        sql(s"create table $tableName (a bigint) partitioned by (b bigint) stored as orc")
      }
      
      val threads = new ArrayBuffer[Thread]
      for (i <- 0 until tableCount) {
        threads.append(new Thread( new Runnable {
          override def run: Unit = {
            val tableName = s"partitioned${i + 1}"
            val rand = Random
            val df = spark.range(0, 20000).toDF("a")
            val location = s"/tmp/${rand.nextLong.abs}"
            df.write.mode("overwrite").orc(location)
            sql(
              s"""
              LOAD DATA LOCAL INPATH '$location' INTO TABLE $tableName partition (b=$i)""")
          }
        }, s"worker$i"))
        threads(i).start()
      }
      
      for (i <- 0 until tableCount) {
        println(s"Joining with thread $i")
        threads(i).join()
      }
      println("All done")
      

      The job often gets stuck after one or two "Joining..." lines.

      kill -3 shows something like this:

      Found one Java-level deadlock:
      =============================
      "worker3":
        waiting to lock monitor 0x00007fdc3cde6798 (object 0x0000000784d98ac8, a org.apache.spark.sql.hive.HiveSessionCatalog),
        which is held by "worker0"
      "worker0":
        waiting to lock monitor 0x00007fdc441d1b88 (object 0x00000007861d1208, a org.apache.spark.sql.hive.HiveExternalCatalog),
        which is held by "worker3"
      

      Attachments

        Activity

          People

            sandeep.katta2007 Sandeep Katta
            bersprockets Bruce Robbins
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: