Uploaded image for project: 'Apache Ozone'
  1. Apache Ozone
  2. HDDS-4092

Writing delta to Ozone hangs when creating the _delta_log json

    XMLWordPrintableJSON

Details

    • Bug
    • Status: In Progress
    • Major
    • Resolution: Unresolved
    • 0.5.0
    • None
    • Ozone Filesystem
    • We are using Kubernetes k8s, Ozone 0.5.0beta, Spark 3.0.0, Hadoop 3.2, Scala 2.12.10, and io.delta:delta-core_2.12:0.7.0

    • Important

    Description

      I am testing writing delta, OSS not databricks, data to Ozone FS since my company is looking to replace Hadoop if feasible. However, whenever I write delta table, the parquet files are writing, the delta log directory is created, but the json is never writing. 

      I am using the spark operator to submit a batch test job to write about 5mb of data.

      Neither on the driver nor on the executor is there an error. The driver never finishes since the creation of the json hangs.

       

      Code I used for testing spark operator and then I ran the pieces in the shell for testing. In the save path, update bucket and volume info for your data store.

      package app.OzoneTest
      
      import org.apache.spark.sql.{DataFrame, SparkSession}
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.types.{BinaryType, StringType}
      
      object CreateData {
      
        def main(args: Array[String]): Unit = {
      
          val spark: SparkSession = SparkSession
            .builder()
            .appName(s"Create Ozone Mock Data")
            .enableHiveSupport()
            .getOrCreate()
      
          import spark.implicits._
      
          val df: DataFrame = Seq.fill(100000)
          {(randomID, randomLat, randomLong, randomDates, randomHour)}
            .toDF("msisdn", "latitude", "longitude", "par_day", "par_hour")
            .withColumn("msisdn", $"msisdn".cast(StringType))
            .withColumn("msisdn", sha1($"msisdn".cast(BinaryType)))
            .select("msisdn", "latitude", "longitude", "par_day", "par_hour")
      
          df
            .repartition(3, $"msisdn")
            .sortWithinPartitions("latitude", "longitude")
            .write
            .partitionBy("par_day", "par_hour")
            .format("delta")
            .save("o3fs://your_bucker.your_volume/location_data")
      
        }
      
        def randomID: Int = scala.util.Random.nextInt(10) + 1
      
        def randomDates: Int = 20200101 + scala.util.Random.nextInt((20200131 - 20200101) + 1)
      
        def randomHour: Int = scala.util.Random.nextInt(24)
      
        def randomLat: Double = 13.5 + scala.util.Random.nextFloat()
      
        def randomLong: Double = 100 + scala.util.Random.nextFloat()
      }
      

      Attachments

        Activity

          People

            elek Marton Elek
            dustin.smith.TDG Dustin Smith
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: