Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-1864

Support for java.time.LocalDate in TimestampBasedAvroKeyGenerator

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      When we read data from MySQL which has a column of type Date, Spark represents it as an instance of java.time.LocalDate. If I try and use this column for partitioning while doing a write to Hudi, I get the following exception

       

      Caused by: org.apache.hudi.exception.HoodieKeyGeneratorException: Unable to parse input partition field :2021-04-21
      	at org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:136) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
      	at org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:89) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
      	at org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:64) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
      	at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
      	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$2(HoodieSparkSqlWriter.scala:160) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
      	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.Iterator.foreach(Iterator.scala:941) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.Iterator.foreach$(Iterator.scala:941) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.AbstractIterator.to(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
      	at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_171]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_171]
      	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_171]
      Caused by: org.apache.hudi.exception.HoodieNotSupportedException: Unexpected type for partition field: java.time.LocalDate
      	at org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:208) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
      	at org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:134) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
      	at org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:89) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
      	at org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:64) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
      	at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
      	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$2(HoodieSparkSqlWriter.scala:160) ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
      	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.Iterator.foreach(Iterator.scala:941) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.Iterator.foreach$(Iterator.scala:941) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.AbstractIterator.to(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) ~[scala-library-2.12.10.jar:?]
      	at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) ~[scala-library-2.12.10.jar:?]
      	at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) ~[spark-core_2.12-3.1.1.jar:3.1.1]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_171]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_171]
      	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_171]
      

      Currently, the only supported column types are

      public String getPartitionPath(Object partitionVal) {
          initIfNeeded();
          long timeMs;
          if (partitionVal instanceof Double) {
            timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
          } else if (partitionVal instanceof Float) {
            timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue());
          } else if (partitionVal instanceof Long) {
            timeMs = convertLongTimeToMillis((Long) partitionVal);
          } else if (partitionVal instanceof CharSequence) {
            if (!inputFormatter.isPresent()) {
              throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
            }
            DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString());
            if (this.outputDateTimeZone == null) {
              // Use the timezone that came off the date that was passed in, if it had one
              partitionFormatter = partitionFormatter.withZone(parsedDateTime.getZone());
            }      timeMs = inputFormatter.get().parseDateTime(partitionVal.toString()).getMillis();
          } else {
            throw new HoodieNotSupportedException(
                "Unexpected type for partition field: " + partitionVal.getClass().getName());
          }
          DateTime timestamp = new DateTime(timeMs, outputDateTimeZone);
          String partitionPath = timestamp.toString(partitionFormatter);
          if (encodePartitionPath) {
            try {
              partitionPath = URLEncoder.encode(partitionPath, StandardCharsets.UTF_8.toString());
            } catch (UnsupportedEncodingException uoe) {
              throw new HoodieException(uoe.getMessage(), uoe);
            }
          }
          return hiveStylePartitioning ? getPartitionPathFields().get(0) + "=" + partitionPath : partitionPath;
        }
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            shivnarayan sivabalan narayanan
            vaibhav.sinha Vaibhav Sinha

            Dates

              Created:
              Updated:

              Slack

                Issue deployment