Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-20588

from_utc_timestamp causes bottleneck

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.2
    • 2.2.0
    • SQL
    • None
    • AWS EMR AMI 5.2.1

    Description

      We have a SQL query that makes use of the from_utc_timestamp function like so: from_utc_timestamp(itemSigningTime,'America/Los_Angeles')

      This causes a major bottleneck. Our exact call is:
      date_add(from_utc_timestamp(itemSigningTime,'America/Los_Angeles'), 1)

      Switching from the above to date_add(itemSigningTime, 1) reduces the job running time from 40 minutes to 9.

      When from_utc_timestamp function is used, several threads in the executors are in the BLOCKED state, on this call stack:

      "Executor task launch worker-63" #261 daemon prio=5 os_prio=0 tid=0x00007f848472e000 nid=0x4294 waiting for monitor entry [0x00007f501981c000]
      java.lang.Thread.State: BLOCKED (on object monitor)
      at java.util.TimeZone.getTimeZone(TimeZone.java:516)

      • waiting to lock <0x00007f5216c2aa58> (a java.lang.Class for java.util.TimeZone)
        at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356)
        at org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

      Can we cache the locale's once per JVM so that we don't do this for every record?

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            ueshin Takuya Ueshin
            ameen.tayyebi@gmail.com Ameen Tayyebi
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment