Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-46990

Regression: Unable to load empty avro files emitted by event-hubs

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.5.0
    • 4.0.0
    • PySpark
    • Databricks 14.0 - 14.3 (spark 3.5.0)

    Description

      In azure, I use databricks and event-hubs. Up until spark version 3.4.1 (in databricks as 13.3 LTS) empty avro files emitted by event-hubs can be read. Since version 3.5.0, it is impossible to load these files (even if I have multiple avro files to load and one of them is empty, it can't perform an operation like count or save). I tested this on databricks versions 14.0, 14.1, 14.2, 14.3 and it doesn't work properly in any of them.

      I use the following code:
       

      df = spark.read.format("avro") \                 .load('abfss://<container>@<storage>.dfs.core.windows.net/<evh-namespace>/<evh>/0/2024/02/05/22/46/10.avro')    
      
      df.count() <- in this operation the spark hangs

      I am sending a fragment of logs from databricks and query plan:

      24/02/06 10:03:10 INFO ProgressReporter$: Added result fetcher for 2734305632140666820_7640723027790427455_4f56f528d4a44796a98821713778d5f9
      24/02/06 10:03:11 INFO InMemoryFileIndex: Start listing leaf files and directories. Size of Paths: 1; threshold: 32
      24/02/06 10:03:11 INFO InMemoryFileIndex: Start listing leaf files and directories. Size of Paths: 0; threshold: 32
      24/02/06 10:03:11 INFO InMemoryFileIndex: It took 9 ms to list leaf files for 1 paths.
      24/02/06 10:03:11 INFO ProgressReporter$: Removed result fetcher for 2734305632140666820_7640723027790427455_4f56f528d4a44796a98821713778d5f9
      24/02/06 10:03:12 INFO ProgressReporter$: Added result fetcher for 2734305632140666820_6526693737104909881_a07acddb350f44a284cac52db0b2fb21
      24/02/06 10:03:12 INFO ClusterLoadMonitor: Added query with execution ID:38. Current active queries:1
      24/02/06 10:03:12 INFO FileSourceStrategy: Pushed Filters: 
      24/02/06 10:03:12 INFO FileSourceStrategy: Post-Scan Filters: 
      24/02/06 10:03:12 INFO CodeGenerator: Code generated in 10.636308 ms
      24/02/06 10:03:12 INFO MemoryStore: Block broadcast_34 stored as values in memory (estimated size 409.5 KiB, free 3.3 GiB)
      24/02/06 10:03:12 INFO MemoryStore: Block broadcast_34_piece0 stored as bytes in memory (estimated size 14.5 KiB, free 3.3 GiB)
      24/02/06 10:03:12 INFO BlockManagerInfo: Added broadcast_34_piece0 in memory on <IP_ADDRESS_2>:43781 (size: 14.5 KiB, free: 3.3 GiB)
      24/02/06 10:03:12 INFO SparkContext: Created broadcast 34 from $anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63
      24/02/06 10:03:12 INFO FileSourceScanExec: Planning scan with bin packing, max split size: 4194304 bytes, max partition size: 4194304, open cost is considered as scanning 4194304 bytes.
      24/02/06 10:03:12 INFO DAGScheduler: Registering RDD 104 ($anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63) as input to shuffle 11
      24/02/06 10:03:12 INFO DAGScheduler: Got map stage job 22 ($anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63) with 1 output partitions
      24/02/06 10:03:12 INFO DAGScheduler: Final stage: ShuffleMapStage 31 ($anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63)
      24/02/06 10:03:12 INFO DAGScheduler: Parents of final stage: List()
      24/02/06 10:03:12 INFO DAGScheduler: Missing parents: List()
      24/02/06 10:03:12 INFO DAGScheduler: Submitting ShuffleMapStage 31 (MapPartitionsRDD[104] at $anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63), which has no missing parents
      24/02/06 10:03:12 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 31 (MapPartitionsRDD[104] at $anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63) (first 15 tasks are for partitions Vector(0))
      24/02/06 10:03:12 INFO TaskSchedulerImpl: Adding task set 31.0 with 1 tasks resource profile 0
      24/02/06 10:03:12 INFO TaskSetManager: TaskSet 31.0 using PreferredLocationsV1
      24/02/06 10:03:12 WARN FairSchedulableBuilder: A job was submitted with scheduler pool 2734305632140666820, which has not been configured. This can happen when the file that pools are read from isn't set, or when that file doesn't contain 2734305632140666820. Created 2734305632140666820 with default configuration (schedulingMode: FIFO, minShare: 0, weight: 1)
      24/02/06 10:03:12 INFO FairSchedulableBuilder: Added task set TaskSet_31.0 tasks to pool 2734305632140666820
      24/02/06 10:03:12 INFO TaskSetManager: Starting task 0.0 in stage 31.0 (TID 449) (<IP_ADDRESS>, executor 3, partition 0, PROCESS_LOCAL, 
      24/02/06 10:03:12 INFO MemoryStore: Block broadcast_35 stored as values in memory (estimated size 137.2 KiB, free 3.3 GiB)
      24/02/06 10:03:12 INFO MemoryStore: Block broadcast_35_piece0 stored as bytes in memory (estimated size 41.3 KiB, free 3.3 GiB)
      24/02/06 10:03:12 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on <IP_ADDRESS_2>:43781 (size: 41.3 KiB, free: 3.3 GiB)
      24/02/06 10:03:12 INFO SparkContext: Created broadcast 35 from broadcast at TaskSetManager.scala:723
      24/02/06 10:03:12 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on <IP_ADDRESS>:40825 (size: 41.3 KiB, free: 3.6 GiB)
      24/02/06 10:03:13 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on <IP_ADDRESS>:40825 (size: 17.6 KiB, free: 3.6 GiB)
      24/02/06 10:03:14 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 0.0, New Ema: 1.0 
      24/02/06 10:03:15 INFO BlockManagerInfo: Added broadcast_34_piece0 in memory on <IP_ADDRESS>:40825 (size: 14.5 KiB, free: 3.6 GiB)
      24/02/06 10:03:17 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:20 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:23 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:26 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:29 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:32 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:35 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:38 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:41 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:44 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:47 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:50 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:53 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:56 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:03:58 INFO DataSourceFactory$: DataSource Jdbc URL: jdbc:mariadb://<DELETED FOR JIRA PURPOSES>
      24/02/06 10:03:58 INFO HikariDataSource: metastore-monitor - Starting...
      24/02/06 10:03:58 INFO HikariDataSource: metastore-monitor - Start completed.
      24/02/06 10:03:58 INFO HikariDataSource: metastore-monitor - Shutdown initiated...
      24/02/06 10:03:58 INFO HikariDataSource: metastore-monitor - Shutdown completed.
      24/02/06 10:03:58 INFO MetastoreMonitor: Metastore healthcheck successful (connection duration = 302 milliseconds)
      24/02/06 10:03:59 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:02 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:05 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:08 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:11 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:13 INFO HiveMetaStore: 1: get_database: default
      24/02/06 10:04:13 INFO audit: ugi=root    ip=unknown-ip-addr    cmd=get_database: default    
      24/02/06 10:04:13 INFO DriverCorral: DBFS health check ok
      24/02/06 10:04:13 INFO DriverCorral: Metastore health check ok
      24/02/06 10:04:14 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:17 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:20 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:23 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:26 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:29 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:32 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:35 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:38 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:41 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:44 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:47 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:50 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:53 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:56 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:04:59 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:02 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:05 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:08 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:11 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:14 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:17 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:20 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:23 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:26 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:29 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:32 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:35 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:38 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      24/02/06 10:05:41 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
      
      == Parsed Logical Plan ==
      Relation [SequenceNumber#451L,Offset#452,EnqueuedTimeUtc#453,SystemProperties#454,Properties#455,Body#456] avro== Analyzed Logical Plan ==
      SequenceNumber: bigint, Offset: string, EnqueuedTimeUtc: string, SystemProperties: map<string,struct<member0:bigint,member1:double,member2:string,member3:binary>>, Properties: map<string,struct<member0:bigint,member1:double,member2:string,member3:binary>>, Body: binary
      Relation [SequenceNumber#451L,Offset#452,EnqueuedTimeUtc#453,SystemProperties#454,Properties#455,Body#456] avro== Optimized Logical Plan ==
      Relation [SequenceNumber#451L,Offset#452,EnqueuedTimeUtc#453,SystemProperties#454,Properties#455,Body#456] avro== Physical Plan ==
      FileScan avro [SequenceNumber#451L,Offset#452,EnqueuedTimeUtc#453,SystemProperties#454,Properties#455,Body#456] Batched: false, DataFilters: [], Format: Avro, Location: InMemoryFileIndex(1 paths)[abfss://<container>@<storage>.dfs.core..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<SequenceNumber:bigint,Offset:string,EnqueuedTimeUtc:string,SystemProperties:map<string,str...
       

      Attachments

        1. second=02.avro
          0.5 kB
          Pavlo Pohrrebnyi

        Issue Links

          Activity

            People

              ivan.sadikov Ivan Sadikov
              kamilkandzia Kamil Kandzia
              Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: