Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37176

JsonSource's infer should have the same exception handle logic as JacksonParser's parse logic

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.3.0
    • 3.3.0
    • SQL
    • None

    Description

      JacksonParser's exception handle logic is different with org.apache.spark.sql.catalyst.json.JsonInferSchema#infer logic, the different can be saw as below:

      // code JacksonParser's parse
      try {
            Utils.tryWithResource(createParser(factory, record)) { parser =>
              // a null first token is equivalent to testing for input.trim.isEmpty
              // but it works on any token stream and not just strings
              parser.nextToken() match {
                case null => None
                case _ => rootConverter.apply(parser) match {
                  case null => throw QueryExecutionErrors.rootConverterReturnNullError()
                  case rows => rows.toSeq
                }
              }
            }
          } catch {
            case e: SparkUpgradeException => throw e
            case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) =>
              // JSON parser currently doesn't support partial results for corrupted records.
              // For such records, all fields other than the field configured by
              // `columnNameOfCorruptRecord` are set to `null`.
              throw BadRecordException(() => recordLiteral(record), () => None, e)
            case e: CharConversionException if options.encoding.isEmpty =>
              val msg =
                """JSON parser cannot handle a character in its input.
                  |Specifying encoding as an input option explicitly might help to resolve the issue.
                  |""".stripMargin + e.getMessage
              val wrappedCharException = new CharConversionException(msg)
              wrappedCharException.initCause(e)
              throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
            case PartialResultException(row, cause) =>
              throw BadRecordException(
                record = () => recordLiteral(record),
                partialResult = () => Some(row),
                cause)
          }
      

      v.s. 

      // JsonInferSchema's infer logic
          val mergedTypesFromPartitions = json.mapPartitions { iter =>
            val factory = options.buildJsonFactory()
            iter.flatMap { row =>
              try {
                Utils.tryWithResource(createParser(factory, row)) { parser =>
                  parser.nextToken()
                  Some(inferField(parser))
                }
              } catch {
                case  e @ (_: RuntimeException | _: JsonProcessingException) => parseMode match {
                  case PermissiveMode =>
                    Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType))))
                  case DropMalformedMode =>
                    None
                  case FailFastMode =>
                    throw QueryExecutionErrors.malformedRecordsDetectedInSchemaInferenceError(e)
                }
              }
            }.reduceOption(typeMerger).toIterator
          }
      
      

      They should have the same exception handle logic, otherwise it may confuse user because of the inconsistency.

      Attachments

        Activity

          People

            advancedxy YE
            advancedxy YE
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: